diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index a4d7430f9b..ec634c1bf9 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -22,9 +22,8 @@ from synapse.storage.event_push_actions import EventPushActionsWorkerStore
from synapse.storage.events_worker import EventsWorkerStore
from synapse.storage.roommember import RoomMemberWorkerStore
from synapse.storage.state import StateGroupWorkerStore
-from synapse.storage.stream import StreamStore
+from synapse.storage.stream import StreamWorkerStore
from synapse.storage.signatures import SignatureStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
@@ -41,33 +40,18 @@ logger = logging.getLogger(__name__)
class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore,
- EventsWorkerStore, StateGroupWorkerStore,
+ StreamWorkerStore, EventsWorkerStore, StateGroupWorkerStore,
BaseSlavedStore):
def __init__(self, db_conn, hs):
- super(SlavedEventStore, self).__init__(db_conn, hs)
self._stream_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering",
)
self._backfill_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering", step=-1
)
- events_max = self._stream_id_gen.get_current_token()
- event_cache_prefill, min_event_val = self._get_cache_dict(
- db_conn, "events",
- entity_column="room_id",
- stream_column="stream_ordering",
- max_value=events_max,
- )
- self._events_stream_cache = StreamChangeCache(
- "EventsRoomStreamChangeCache", min_event_val,
- prefilled_cache=event_cache_prefill,
- )
- self._membership_stream_cache = StreamChangeCache(
- "MembershipStreamChangeCache", events_max,
- )
- self._stream_order_on_start = self.get_room_max_stream_ordering()
+ super(SlavedEventStore, self).__init__(db_conn, hs)
# Cached functions can't be accessed through a class instance so we need
# to reach inside the __dict__ to extract them.
@@ -75,30 +59,6 @@ class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore,
"get_latest_event_ids_in_room"
]
- get_recent_event_ids_for_room = (
- StreamStore.__dict__["get_recent_event_ids_for_room"]
- )
- has_room_changed_since = DataStore.has_room_changed_since.__func__
-
- get_membership_changes_for_user = (
- DataStore.get_membership_changes_for_user.__func__
- )
- get_room_events_max_id = DataStore.get_room_events_max_id.__func__
- get_room_events_stream_for_room = (
- DataStore.get_room_events_stream_for_room.__func__
- )
- get_events_around = DataStore.get_events_around.__func__
-
- get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__
- get_room_events_stream_for_rooms = (
- DataStore.get_room_events_stream_for_rooms.__func__
- )
- get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__
-
- _set_before_and_after = staticmethod(DataStore._set_before_and_after)
-
- _get_events_around_txn = DataStore._get_events_around_txn.__func__
-
get_backfill_events = DataStore.get_backfill_events.__func__
_get_backfill_events = DataStore._get_backfill_events.__func__
get_missing_events = DataStore.get_missing_events.__func__
@@ -119,8 +79,11 @@ class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore,
get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__
- get_federation_out_pos = DataStore.get_federation_out_pos.__func__
- update_federation_out_pos = DataStore.update_federation_out_pos.__func__
+ def get_room_max_stream_ordering(self):
+ return self._stream_id_gen.get_current_token()
+
+ def get_room_min_stream_ordering(self):
+ return self._backfill_id_gen.get_current_token()
get_latest_event_ids_and_hashes_in_room = (
DataStore.get_latest_event_ids_and_hashes_in_room.__func__
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index b3cdcfdc21..de00cae447 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -140,22 +140,6 @@ class DataStore(RoomMemberStore, RoomStore,
else:
self._cache_id_gen = None
- events_max = self._stream_id_gen.get_current_token()
- event_cache_prefill, min_event_val = self._get_cache_dict(
- db_conn, "events",
- entity_column="room_id",
- stream_column="stream_ordering",
- max_value=events_max,
- )
- self._events_stream_cache = StreamChangeCache(
- "EventsRoomStreamChangeCache", min_event_val,
- prefilled_cache=event_cache_prefill,
- )
-
- self._membership_stream_cache = StreamChangeCache(
- "MembershipStreamChangeCache", events_max,
- )
-
self._presence_on_startup = self._get_active_presence(db_conn)
presence_cache_prefill, min_presence_val = self._get_cache_dict(
@@ -203,6 +187,7 @@ class DataStore(RoomMemberStore, RoomStore,
"DeviceListFederationStreamChangeCache", device_list_max,
)
+ events_max = self._stream_id_gen.get_current_token()
curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict(
db_conn, "current_state_delta_stream",
entity_column="room_id",
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 52bdce5be2..a2527d2a36 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -35,13 +35,17 @@ what sort order was used:
from twisted.internet import defer
-from ._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore
+from synapse.storage.events import EventsWorkerStore
+
from synapse.util.caches.descriptors import cached
from synapse.api.constants import EventTypes
from synapse.types import RoomStreamToken
+from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+import abc
import logging
@@ -143,81 +147,41 @@ def filter_to_clause(event_filter):
return " AND ".join(clauses), args
-class StreamStore(SQLBaseStore):
- @defer.inlineCallbacks
- def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
- # NB this lives here instead of appservice.py so we can reuse the
- # 'private' StreamToken class in this file.
- if limit:
- limit = max(limit, MAX_STREAM_SIZE)
- else:
- limit = MAX_STREAM_SIZE
+class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
+ """This is an abstract base class where subclasses must implement
+ `get_room_max_stream_ordering` and `get_room_min_stream_ordering`
+ which can be called in the initializer.
+ """
- # From and to keys should be integers from ordering.
- from_id = RoomStreamToken.parse_stream_token(from_key)
- to_id = RoomStreamToken.parse_stream_token(to_key)
+ __metaclass__ = abc.ABCMeta
- if from_key == to_key:
- defer.returnValue(([], to_key))
- return
+ def __init__(self, db_conn, hs):
+ super(StreamWorkerStore, self).__init__(db_conn, hs)
- # select all the events between from/to with a sensible limit
- sql = (
- "SELECT e.event_id, e.room_id, e.type, s.state_key, "
- "e.stream_ordering FROM events AS e "
- "LEFT JOIN state_events as s ON "
- "e.event_id = s.event_id "
- "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
- "ORDER BY stream_ordering ASC LIMIT %(limit)d "
- ) % {
- "limit": limit
- }
-
- def f(txn):
- # pull out all the events between the tokens
- txn.execute(sql, (from_id.stream, to_id.stream,))
- rows = self.cursor_to_dict(txn)
-
- # Logic:
- # - We want ALL events which match the AS room_id regex
- # - We want ALL events which match the rooms represented by the AS
- # room_alias regex
- # - We want ALL events for rooms that AS users have joined.
- # This is currently supported via get_app_service_rooms (which is
- # used for the Notifier listener rooms). We can't reasonably make a
- # SQL query for these room IDs, so we'll pull all the events between
- # from/to and filter in python.
- rooms_for_as = self._get_app_service_rooms_txn(txn, service)
- room_ids_for_as = [r.room_id for r in rooms_for_as]
-
- def app_service_interested(row):
- if row["room_id"] in room_ids_for_as:
- return True
-
- if row["type"] == EventTypes.Member:
- if service.is_interested_in_user(row.get("state_key")):
- return True
- return False
-
- return [r for r in rows if app_service_interested(r)]
-
- rows = yield self.runInteraction("get_appservice_room_stream", f)
-
- ret = yield self._get_events(
- [r["event_id"] for r in rows],
- get_prev_content=True
+ events_max = self.get_room_max_stream_ordering()
+ event_cache_prefill, min_event_val = self._get_cache_dict(
+ db_conn, "events",
+ entity_column="room_id",
+ stream_column="stream_ordering",
+ max_value=events_max,
+ )
+ self._events_stream_cache = StreamChangeCache(
+ "EventsRoomStreamChangeCache", min_event_val,
+ prefilled_cache=event_cache_prefill,
+ )
+ self._membership_stream_cache = StreamChangeCache(
+ "MembershipStreamChangeCache", events_max,
)
- self._set_before_and_after(ret, rows, topo_order=from_id is None)
+ self._stream_order_on_start = self.get_room_max_stream_ordering()
- if rows:
- key = "s%d" % max(r["stream_ordering"] for r in rows)
- else:
- # Assume we didn't get anything because there was nothing to
- # get.
- key = to_key
+ @abc.abstractmethod
+ def get_room_max_stream_ordering(self):
+ raise NotImplementedError()
- defer.returnValue((ret, key))
+ @abc.abstractmethod
+ def get_room_min_stream_ordering(self):
+ raise NotImplementedError()
@defer.inlineCallbacks
def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0,
@@ -381,88 +345,6 @@ class StreamStore(SQLBaseStore):
defer.returnValue(ret)
@defer.inlineCallbacks
- def paginate_room_events(self, room_id, from_key, to_key=None,
- direction='b', limit=-1, event_filter=None):
- # Tokens really represent positions between elements, but we use
- # the convention of pointing to the event before the gap. Hence
- # we have a bit of asymmetry when it comes to equalities.
- args = [False, room_id]
- if direction == 'b':
- order = "DESC"
- bounds = upper_bound(
- RoomStreamToken.parse(from_key), self.database_engine
- )
- if to_key:
- bounds = "%s AND %s" % (bounds, lower_bound(
- RoomStreamToken.parse(to_key), self.database_engine
- ))
- else:
- order = "ASC"
- bounds = lower_bound(
- RoomStreamToken.parse(from_key), self.database_engine
- )
- if to_key:
- bounds = "%s AND %s" % (bounds, upper_bound(
- RoomStreamToken.parse(to_key), self.database_engine
- ))
-
- filter_clause, filter_args = filter_to_clause(event_filter)
-
- if filter_clause:
- bounds += " AND " + filter_clause
- args.extend(filter_args)
-
- if int(limit) > 0:
- args.append(int(limit))
- limit_str = " LIMIT ?"
- else:
- limit_str = ""
-
- sql = (
- "SELECT * FROM events"
- " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
- " ORDER BY topological_ordering %(order)s,"
- " stream_ordering %(order)s %(limit)s"
- ) % {
- "bounds": bounds,
- "order": order,
- "limit": limit_str
- }
-
- def f(txn):
- txn.execute(sql, args)
-
- rows = self.cursor_to_dict(txn)
-
- if rows:
- topo = rows[-1]["topological_ordering"]
- toke = rows[-1]["stream_ordering"]
- if direction == 'b':
- # Tokens are positions between events.
- # This token points *after* the last event in the chunk.
- # We need it to point to the event before it in the chunk
- # when we are going backwards so we subtract one from the
- # stream part.
- toke -= 1
- next_token = str(RoomStreamToken(topo, toke))
- else:
- # TODO (erikj): We should work out what to do here instead.
- next_token = to_key if to_key else from_key
-
- return rows, next_token,
-
- rows, token = yield self.runInteraction("paginate_room_events", f)
-
- events = yield self._get_events(
- [r["event_id"] for r in rows],
- get_prev_content=True
- )
-
- self._set_before_and_after(events, rows)
-
- defer.returnValue((events, token))
-
- @defer.inlineCallbacks
def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
rows, token = yield self.get_recent_event_ids_for_room(
room_id, limit, end_token, from_token
@@ -542,7 +424,7 @@ class StreamStore(SQLBaseStore):
`room_id` causes it to return the current room specific topological
token.
"""
- token = yield self._stream_id_gen.get_current_token()
+ token = yield self.get_room_max_stream_ordering()
if room_id is None:
defer.returnValue("s%d" % (token,))
else:
@@ -552,12 +434,6 @@ class StreamStore(SQLBaseStore):
)
defer.returnValue("t%d-%d" % (topo, token))
- def get_room_max_stream_ordering(self):
- return self._stream_id_gen.get_current_token()
-
- def get_room_min_stream_ordering(self):
- return self._backfill_id_gen.get_current_token()
-
def get_stream_token_for_event(self, event_id):
"""The stream token for an event
Args:
@@ -832,3 +708,168 @@ class StreamStore(SQLBaseStore):
def has_room_changed_since(self, room_id, stream_id):
return self._events_stream_cache.has_entity_changed(room_id, stream_id)
+
+
+class StreamStore(StreamWorkerStore):
+ def get_room_max_stream_ordering(self):
+ return self._stream_id_gen.get_current_token()
+
+ def get_room_min_stream_ordering(self):
+ return self._backfill_id_gen.get_current_token()
+
+ @defer.inlineCallbacks
+ def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
+ # NB this lives here instead of appservice.py so we can reuse the
+ # 'private' StreamToken class in this file.
+ if limit:
+ limit = max(limit, MAX_STREAM_SIZE)
+ else:
+ limit = MAX_STREAM_SIZE
+
+ # From and to keys should be integers from ordering.
+ from_id = RoomStreamToken.parse_stream_token(from_key)
+ to_id = RoomStreamToken.parse_stream_token(to_key)
+
+ if from_key == to_key:
+ defer.returnValue(([], to_key))
+ return
+
+ # select all the events between from/to with a sensible limit
+ sql = (
+ "SELECT e.event_id, e.room_id, e.type, s.state_key, "
+ "e.stream_ordering FROM events AS e "
+ "LEFT JOIN state_events as s ON "
+ "e.event_id = s.event_id "
+ "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
+ "ORDER BY stream_ordering ASC LIMIT %(limit)d "
+ ) % {
+ "limit": limit
+ }
+
+ def f(txn):
+ # pull out all the events between the tokens
+ txn.execute(sql, (from_id.stream, to_id.stream,))
+ rows = self.cursor_to_dict(txn)
+
+ # Logic:
+ # - We want ALL events which match the AS room_id regex
+ # - We want ALL events which match the rooms represented by the AS
+ # room_alias regex
+ # - We want ALL events for rooms that AS users have joined.
+ # This is currently supported via get_app_service_rooms (which is
+ # used for the Notifier listener rooms). We can't reasonably make a
+ # SQL query for these room IDs, so we'll pull all the events between
+ # from/to and filter in python.
+ rooms_for_as = self._get_app_service_rooms_txn(txn, service)
+ room_ids_for_as = [r.room_id for r in rooms_for_as]
+
+ def app_service_interested(row):
+ if row["room_id"] in room_ids_for_as:
+ return True
+
+ if row["type"] == EventTypes.Member:
+ if service.is_interested_in_user(row.get("state_key")):
+ return True
+ return False
+
+ return [r for r in rows if app_service_interested(r)]
+
+ rows = yield self.runInteraction("get_appservice_room_stream", f)
+
+ ret = yield self._get_events(
+ [r["event_id"] for r in rows],
+ get_prev_content=True
+ )
+
+ self._set_before_and_after(ret, rows, topo_order=from_id is None)
+
+ if rows:
+ key = "s%d" % max(r["stream_ordering"] for r in rows)
+ else:
+ # Assume we didn't get anything because there was nothing to
+ # get.
+ key = to_key
+
+ defer.returnValue((ret, key))
+
+ @defer.inlineCallbacks
+ def paginate_room_events(self, room_id, from_key, to_key=None,
+ direction='b', limit=-1, event_filter=None):
+ # Tokens really represent positions between elements, but we use
+ # the convention of pointing to the event before the gap. Hence
+ # we have a bit of asymmetry when it comes to equalities.
+ args = [False, room_id]
+ if direction == 'b':
+ order = "DESC"
+ bounds = upper_bound(
+ RoomStreamToken.parse(from_key), self.database_engine
+ )
+ if to_key:
+ bounds = "%s AND %s" % (bounds, lower_bound(
+ RoomStreamToken.parse(to_key), self.database_engine
+ ))
+ else:
+ order = "ASC"
+ bounds = lower_bound(
+ RoomStreamToken.parse(from_key), self.database_engine
+ )
+ if to_key:
+ bounds = "%s AND %s" % (bounds, upper_bound(
+ RoomStreamToken.parse(to_key), self.database_engine
+ ))
+
+ filter_clause, filter_args = filter_to_clause(event_filter)
+
+ if filter_clause:
+ bounds += " AND " + filter_clause
+ args.extend(filter_args)
+
+ if int(limit) > 0:
+ args.append(int(limit))
+ limit_str = " LIMIT ?"
+ else:
+ limit_str = ""
+
+ sql = (
+ "SELECT * FROM events"
+ " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
+ " ORDER BY topological_ordering %(order)s,"
+ " stream_ordering %(order)s %(limit)s"
+ ) % {
+ "bounds": bounds,
+ "order": order,
+ "limit": limit_str
+ }
+
+ def f(txn):
+ txn.execute(sql, args)
+
+ rows = self.cursor_to_dict(txn)
+
+ if rows:
+ topo = rows[-1]["topological_ordering"]
+ toke = rows[-1]["stream_ordering"]
+ if direction == 'b':
+ # Tokens are positions between events.
+ # This token points *after* the last event in the chunk.
+ # We need it to point to the event before it in the chunk
+ # when we are going backwards so we subtract one from the
+ # stream part.
+ toke -= 1
+ next_token = str(RoomStreamToken(topo, toke))
+ else:
+ # TODO (erikj): We should work out what to do here instead.
+ next_token = to_key if to_key else from_key
+
+ return rows, next_token,
+
+ rows, token = yield self.runInteraction("paginate_room_events", f)
+
+ events = yield self._get_events(
+ [r["event_id"] for r in rows],
+ get_prev_content=True
+ )
+
+ self._set_before_and_after(events, rows)
+
+ defer.returnValue((events, token))
|