diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 5d5b5f7c44..470b7b7663 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -13,30 +13,35 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from twisted.internet import defer
from synapse.api.events.room import (
RoomMemberEvent, MessageEvent, RoomTopicEvent, FeedbackEvent,
- RoomConfigEvent
+ RoomConfigEvent, RoomNameEvent,
)
+from synapse.util.logutils import log_function
+
from .directory import DirectoryStore
from .feedback import FeedbackStore
-from .message import MessageStore
from .presence import PresenceStore
from .profile import ProfileStore
from .registration import RegistrationStore
from .room import RoomStore
from .roommember import RoomMemberStore
-from .roomdata import RoomDataStore
from .stream import StreamStore
from .pdu import StatePduStore, PduStore
from .transactions import TransactionStore
import json
+import logging
import os
-class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore,
+logger = logging.getLogger(__name__)
+
+
+class DataStore(RoomMemberStore, RoomStore,
RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
PresenceStore, PduStore, StatePduStore, TransactionStore,
DirectoryStore):
@@ -44,51 +49,139 @@ class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore,
def __init__(self, hs):
super(DataStore, self).__init__(hs)
self.event_factory = hs.get_event_factory()
+ self.hs = hs
- def persist_event(self, event):
- if event.type == MessageEvent.TYPE:
- return self.store_message(
- user_id=event.user_id,
- room_id=event.room_id,
- msg_id=event.msg_id,
- content=json.dumps(event.content)
- )
- elif event.type == RoomMemberEvent.TYPE:
- return self.store_room_member(
- user_id=event.target_user_id,
- sender=event.user_id,
- room_id=event.room_id,
- content=event.content,
- membership=event.content["membership"]
- )
+ self.min_token_deferred = self._get_min_token()
+ self.min_token = None
+
+ @defer.inlineCallbacks
+ @log_function
+ def persist_event(self, event, backfilled=False):
+ if event.type == RoomMemberEvent.TYPE:
+ yield self._store_room_member(event)
elif event.type == FeedbackEvent.TYPE:
- return self.store_feedback(
- room_id=event.room_id,
- msg_id=event.msg_id,
- msg_sender_id=event.msg_sender_id,
- fb_sender_id=event.user_id,
- fb_type=event.feedback_type,
- content=json.dumps(event.content)
- )
+ yield self._store_feedback(event)
+# elif event.type == RoomConfigEvent.TYPE:
+# yield self._store_room_config(event)
+ elif event.type == RoomNameEvent.TYPE:
+ yield self._store_room_name(event)
elif event.type == RoomTopicEvent.TYPE:
- return self.store_room_data(
- room_id=event.room_id,
- etype=event.type,
- state_key=event.state_key,
- content=json.dumps(event.content)
+ yield self._store_room_topic(event)
+
+ ret = yield self._store_event(event, backfilled)
+ defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def get_event(self, event_id):
+ events_dict = yield self._simple_select_one(
+ "events",
+ {"event_id": event_id},
+ [
+ "event_id",
+ "type",
+ "sender",
+ "room_id",
+ "content",
+ "unrecognized_keys"
+ ],
+ )
+
+ event = self._parse_event_from_row(events_dict)
+ defer.returnValue(event)
+
+ @defer.inlineCallbacks
+ @log_function
+ def _store_event(self, event, backfilled):
+ # FIXME (erikj): This should be removed when we start amalgamating
+ # event and pdu storage
+ yield self.hs.get_federation().fill_out_prev_events(event)
+
+ vals = {
+ "topological_ordering": event.depth,
+ "event_id": event.event_id,
+ "type": event.type,
+ "room_id": event.room_id,
+ "content": json.dumps(event.content),
+ "processed": True,
+ }
+
+ if backfilled:
+ if not self.min_token_deferred.called:
+ yield self.min_token_deferred
+ self.min_token -= 1
+ vals["stream_ordering"] = self.min_token
+
+ unrec = {
+ k: v
+ for k, v in event.get_full_dict().items()
+ if k not in vals.keys()
+ }
+ vals["unrecognized_keys"] = json.dumps(unrec)
+
+ try:
+ yield self._simple_insert("events", vals)
+ except:
+ logger.exception("Failed to persist, probably duplicate")
+ return
+
+ if not backfilled and hasattr(event, "state_key"):
+ vals = {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ }
+
+ if hasattr(event, "prev_state"):
+ vals["prev_state"] = event.prev_state
+
+ yield self._simple_insert("state_events", vals)
+
+ yield self._simple_insert(
+ "current_state_events",
+ {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ }
)
- elif event.type == RoomConfigEvent.TYPE:
- if "visibility" in event.content:
- visibility = event.content["visibility"]
- return self.store_room_config(
- room_id=event.room_id,
- visibility=visibility
- )
+ latest = yield self.get_room_events_max_id()
+ defer.returnValue(latest)
+
+ @defer.inlineCallbacks
+ def get_current_state(self, room_id, event_type=None, state_key=""):
+ sql = (
+ "SELECT e.* FROM events as e "
+ "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
+ "INNER JOIN state_events as s ON e.event_id = s.event_id "
+ "WHERE c.room_id = ? "
+ )
+
+ if event_type:
+ sql += " AND s.type = ? AND s.state_key = ? "
+ args = (room_id, event_type, state_key)
else:
- raise NotImplementedError(
- "Don't know how to persist type=%s" % event.type
- )
+ args = (room_id, )
+
+ results = yield self._execute_and_decode(sql, *args)
+
+ defer.returnValue([self._parse_event_from_row(r) for r in results])
+
+ @defer.inlineCallbacks
+ def _get_min_token(self):
+ row = yield self._execute(
+ None,
+ "SELECT MIN(stream_ordering) FROM events"
+ )
+
+ self.min_token = row[0][0] if row and row[0] and row[0][0] else -1
+ self.min_token = min(self.min_token, -1)
+
+ logger.debug("min_token is: %s", self.min_token)
+
+ defer.returnValue(self.min_token)
def schema_path(schema):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index bf1800f4bf..36cc57c1b8 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import logging
from twisted.internet import defer
@@ -20,6 +19,9 @@ from twisted.internet import defer
from synapse.api.errors import StoreError
import collections
+import copy
+import json
+
logger = logging.getLogger(__name__)
@@ -29,6 +31,7 @@ class SQLBaseStore(object):
def __init__(self, hs):
self.hs = hs
self._db_pool = hs.get_db_pool()
+ self.event_factory = hs.get_event_factory()
self._clock = hs.get_clock()
def cursor_to_dict(self, cursor):
@@ -57,14 +60,22 @@ class SQLBaseStore(object):
The result of decoder(results)
"""
logger.debug(
- "[SQL] %s Args=%s Func=%s", query, args, decoder.__name__
+ "[SQL] %s Args=%s Func=%s",
+ query, args, decoder.__name__ if decoder else None
)
def interaction(txn):
cursor = txn.execute(query, args)
- return decoder(cursor)
+ if decoder:
+ return decoder(cursor)
+ else:
+ return cursor.fetchall()
+
return self._db_pool.runInteraction(interaction)
+ def _execute_and_decode(self, query, *args):
+ return self._execute(self.cursor_to_dict, query, *args)
+
# "Simple" SQL API methods that operate on a single table with no JOINs,
# no complex WHERE clauses, just a dict of values for columns.
@@ -281,6 +292,17 @@ class SQLBaseStore(object):
return self._db_pool.runInteraction(func)
+ def _parse_event_from_row(self, row_dict):
+ d = copy.deepcopy({k: v for k, v in row_dict.items() if v})
+ d.update(json.loads(row_dict["unrecognized_keys"]))
+ d["content"] = json.loads(d["content"])
+ del d["unrecognized_keys"]
+
+ return self.event_factory.create_event(
+ etype=d["type"],
+ **d
+ )
+
class Table(object):
""" A base class used to store information about a particular table.
diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py
index 9bd562c762..e60f98d1e1 100644
--- a/synapse/storage/feedback.py
+++ b/synapse/storage/feedback.py
@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from twisted.internet import defer
+
from ._base import SQLBaseStore, Table
from synapse.api.events.room import FeedbackEvent
@@ -22,54 +24,28 @@ import json
class FeedbackStore(SQLBaseStore):
- def store_feedback(self, room_id, msg_id, msg_sender_id,
- fb_sender_id, fb_type, content):
- return self._simple_insert(FeedbackTable.table_name, dict(
- room_id=room_id,
- msg_id=msg_id,
- msg_sender_id=msg_sender_id,
- fb_sender_id=fb_sender_id,
- fb_type=fb_type,
- content=content,
- ))
-
- def get_feedback(self, room_id=None, msg_id=None, msg_sender_id=None,
- fb_sender_id=None, fb_type=None):
- query = FeedbackTable.select_statement(
- "msg_sender_id = ? AND room_id = ? AND msg_id = ? " +
- "AND fb_sender_id = ? AND feedback_type = ? " +
- "ORDER BY id DESC LIMIT 1")
- return self._execute(
- FeedbackTable.decode_single_result,
- query, msg_sender_id, room_id, msg_id, fb_sender_id, fb_type,
+ def _store_feedback(self, event):
+ return self._simple_insert("feedback", {
+ "event_id": event.event_id,
+ "feedback_type": event.feedback_type,
+ "room_id": event.room_id,
+ "target_event_id": event.target_event,
+ "sender": event.user_id,
+ })
+
+ @defer.inlineCallbacks
+ def get_feedback_for_event(self, event_id):
+ sql = (
+ "SELECT events.* FROM events INNER JOIN feedback "
+ "ON events.event_id = feedback.event_id "
+ "WHERE feedback.target_event_id = ? "
)
- def get_max_feedback_id(self):
- return self._simple_max_id(FeedbackTable.table_name)
-
-
-class FeedbackTable(Table):
- table_name = "feedback"
+ rows = yield self._execute_and_decode(sql, event_id)
- fields = [
- "id",
- "content",
- "feedback_type",
- "fb_sender_id",
- "msg_id",
- "room_id",
- "msg_sender_id"
- ]
-
- class EntryType(collections.namedtuple("FeedbackEntry", fields)):
-
- def as_event(self, event_factory):
- return event_factory.create_event(
- etype=FeedbackEvent.TYPE,
- room_id=self.room_id,
- msg_id=self.msg_id,
- msg_sender_id=self.msg_sender_id,
- user_id=self.fb_sender_id,
- feedback_type=self.feedback_type,
- content=json.loads(self.content),
- )
+ defer.returnValue(
+ [
+ self._parse_event_from_row(r)
+ for r in rows
+ ]
+ )
diff --git a/synapse/storage/message.py b/synapse/storage/message.py
deleted file mode 100644
index 7bb69c1384..0000000000
--- a/synapse/storage/message.py
+++ /dev/null
@@ -1,81 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014 matrix.org
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ._base import SQLBaseStore, Table
-from synapse.api.events.room import MessageEvent
-
-import collections
-import json
-
-
-class MessageStore(SQLBaseStore):
-
- def get_message(self, user_id, room_id, msg_id):
- """Get a message from the store.
-
- Args:
- user_id (str): The ID of the user who sent the message.
- room_id (str): The room the message was sent in.
- msg_id (str): The unique ID for this user/room combo.
- """
- query = MessagesTable.select_statement(
- "user_id = ? AND room_id = ? AND msg_id = ? " +
- "ORDER BY id DESC LIMIT 1")
- return self._execute(
- MessagesTable.decode_single_result,
- query, user_id, room_id, msg_id,
- )
-
- def store_message(self, user_id, room_id, msg_id, content):
- """Store a message in the store.
-
- Args:
- user_id (str): The ID of the user who sent the message.
- room_id (str): The room the message was sent in.
- msg_id (str): The unique ID for this user/room combo.
- content (str): The content of the message (JSON)
- """
- return self._simple_insert(MessagesTable.table_name, dict(
- user_id=user_id,
- room_id=room_id,
- msg_id=msg_id,
- content=content,
- ))
-
- def get_max_message_id(self):
- return self._simple_max_id(MessagesTable.table_name)
-
-
-class MessagesTable(Table):
- table_name = "messages"
-
- fields = [
- "id",
- "user_id",
- "room_id",
- "msg_id",
- "content"
- ]
-
- class EntryType(collections.namedtuple("MessageEntry", fields)):
-
- def as_event(self, event_factory):
- return event_factory.create_event(
- etype=MessageEvent.TYPE,
- room_id=self.room_id,
- user_id=self.user_id,
- msg_id=self.msg_id,
- content=json.loads(self.content),
- )
diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py
index 13adc581e1..7655f43ede 100644
--- a/synapse/storage/pdu.py
+++ b/synapse/storage/pdu.py
@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from twisted.internet import defer
+
from ._base import SQLBaseStore, Table, JoinHelper
from synapse.util.logutils import log_function
@@ -319,6 +321,7 @@ class PduStore(SQLBaseStore):
return [(row[0], row[1], row[2]) for row in results]
+ @defer.inlineCallbacks
def get_oldest_pdus_in_context(self, context):
"""Get a list of Pdus that we haven't backfilled beyond yet (and haven't
seen). This list is used when we want to backfill backwards and is the
@@ -331,17 +334,14 @@ class PduStore(SQLBaseStore):
Returns:
list: A list of PduIdTuple.
"""
- return self._db_pool.runInteraction(
- self._get_oldest_pdus_in_context, context
- )
-
- def _get_oldest_pdus_in_context(self, txn, context):
- txn.execute(
+ results = yield self._execute(
+ None,
"SELECT pdu_id, origin FROM %(back)s WHERE context = ?"
% {"back": PduBackwardExtremitiesTable.table_name, },
- (context,)
+ context
)
- return [PduIdTuple(i, o) for i, o in txn.fetchall()]
+
+ defer.returnValue([PduIdTuple(i, o) for i, o in results])
def is_pdu_new(self, pdu_id, origin, context, depth):
"""For a given Pdu, try and figure out if it's 'new', i.e., if it's
@@ -580,7 +580,7 @@ class StatePduStore(SQLBaseStore):
txn.execute(query, query_args)
- def get_current_state(self, context, pdu_type, state_key):
+ def get_current_state_pdu(self, context, pdu_type, state_key):
"""For a given context, pdu_type, state_key 3-tuple, return what is
currently considered the current state.
@@ -595,10 +595,10 @@ class StatePduStore(SQLBaseStore):
"""
return self._db_pool.runInteraction(
- self._get_current_state, context, pdu_type, state_key
+ self._get_current_state_pdu, context, pdu_type, state_key
)
- def _get_current_state(self, txn, context, pdu_type, state_key):
+ def _get_current_state_pdu(self, txn, context, pdu_type, state_key):
return self._get_current_interaction(txn, context, pdu_type, state_key)
def _get_current_interaction(self, txn, context, pdu_type, state_key):
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index a97162831b..22f2dcca45 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -76,49 +76,80 @@ class RoomStore(SQLBaseStore):
)
@defer.inlineCallbacks
- def get_rooms(self, is_public, with_topics):
+ def get_rooms(self, is_public):
"""Retrieve a list of all public rooms.
Args:
is_public (bool): True if the rooms returned should be public.
- with_topics (bool): True to include the current topic for the room
- in the response.
Returns:
- A list of room dicts containing at least a "room_id" key, and a
- "topic" key if one is set and with_topic=True.
+ A list of room dicts containing at least a "room_id" key, a
+ "topic" key if one is set, and a "name" key if one is set
"""
- room_data_type = RoomTopicEvent.TYPE
- public = 1 if is_public else 0
-
- latest_topic = ("SELECT max(room_data.id) FROM room_data WHERE "
- + "room_data.type = ? GROUP BY room_id")
-
- query = ("SELECT rooms.*, room_data.content, room_alias FROM rooms "
- + "LEFT JOIN "
- + "room_aliases ON room_aliases.room_id = rooms.room_id "
- + "LEFT JOIN "
- + "room_data ON rooms.room_id = room_data.room_id WHERE "
- + "(room_data.id IN (" + latest_topic + ") "
- + "OR room_data.id IS NULL) AND rooms.is_public = ?")
-
- res = yield self._execute(
- self.cursor_to_dict, query, room_data_type, public
+
+ topic_subquery = (
+ "SELECT topics.event_id as event_id, "
+ "topics.room_id as room_id, topic "
+ "FROM topics "
+ "INNER JOIN current_state_events as c "
+ "ON c.event_id = topics.event_id "
)
- # return only the keys the specification expects
- ret_keys = ["room_id", "topic", "room_alias"]
+ name_subquery = (
+ "SELECT room_names.event_id as event_id, "
+ "room_names.room_id as room_id, name "
+ "FROM room_names "
+ "INNER JOIN current_state_events as c "
+ "ON c.event_id = room_names.event_id "
+ )
- # extract topic from the json (icky) FIXME
- for i, room_row in enumerate(res):
- try:
- content_json = json.loads(room_row["content"])
- room_row["topic"] = content_json["topic"]
- except:
- pass # no topic set
- # filter the dict based on ret_keys
- res[i] = {k: v for k, v in room_row.iteritems() if k in ret_keys}
+ # We use non printing ascii character US () as a seperator
+ sql = (
+ "SELECT r.room_id, n.name, t.topic, "
+ "group_concat(a.room_alias, '') "
+ "FROM rooms AS r "
+ "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id "
+ "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id "
+ "INNER JOIN room_aliases AS a ON a.room_id = r.room_id "
+ "WHERE r.is_public = ? "
+ "GROUP BY r.room_id "
+ ) % {
+ "topic": topic_subquery,
+ "name": name_subquery,
+ }
+
+ rows = yield self._execute(None, sql, is_public)
+
+ ret = [
+ {
+ "room_id": r[0],
+ "name": r[1],
+ "topic": r[2],
+ "aliases": r[3].split(""),
+ }
+ for r in rows
+ ]
+
+ defer.returnValue(ret)
+
+ def _store_room_topic(self, event):
+ return self._simple_insert(
+ "topics",
+ {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "topic": event.topic,
+ }
+ )
- defer.returnValue(res)
+ def _store_room_name(self, event):
+ return self._simple_insert(
+ "room_names",
+ {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "name": event.name,
+ }
+ )
class RoomsTable(Table):
diff --git a/synapse/storage/roomdata.py b/synapse/storage/roomdata.py
deleted file mode 100644
index cc04d1ba14..0000000000
--- a/synapse/storage/roomdata.py
+++ /dev/null
@@ -1,85 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014 matrix.org
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ._base import SQLBaseStore, Table
-
-import collections
-import json
-
-
-class RoomDataStore(SQLBaseStore):
-
- """Provides various CRUD operations for Room Events. """
-
- def get_room_data(self, room_id, etype, state_key=""):
- """Retrieve the data stored under this type and state_key.
-
- Args:
- room_id (str)
- etype (str)
- state_key (str)
- Returns:
- namedtuple: Or None if nothing exists at this path.
- """
- query = RoomDataTable.select_statement(
- "room_id = ? AND type = ? AND state_key = ? "
- "ORDER BY id DESC LIMIT 1"
- )
- return self._execute(
- RoomDataTable.decode_single_result,
- query, room_id, etype, state_key,
- )
-
- def store_room_data(self, room_id, etype, state_key="", content=None):
- """Stores room specific data.
-
- Args:
- room_id (str)
- etype (str)
- state_key (str)
- data (str)- The data to store for this path in JSON.
- Returns:
- The store ID for this data.
- """
- return self._simple_insert(RoomDataTable.table_name, dict(
- etype=etype,
- state_key=state_key,
- room_id=room_id,
- content=content,
- ))
-
- def get_max_room_data_id(self):
- return self._simple_max_id(RoomDataTable.table_name)
-
-
-class RoomDataTable(Table):
- table_name = "room_data"
-
- fields = [
- "id",
- "room_id",
- "type",
- "state_key",
- "content"
- ]
-
- class EntryType(collections.namedtuple("RoomDataEntry", fields)):
-
- def as_event(self, event_factory):
- return event_factory.create_event(
- etype=self.type,
- room_id=self.room_id,
- content=json.loads(self.content),
- )
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index c45d128f1b..89c87290cf 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -31,6 +31,38 @@ logger = logging.getLogger(__name__)
class RoomMemberStore(SQLBaseStore):
+ @defer.inlineCallbacks
+ def _store_room_member(self, event):
+ """Store a room member in the database.
+ """
+ domain = self.hs.parse_userid(event.target_user_id).domain
+
+ yield self._simple_insert(
+ "room_memberships",
+ {
+ "event_id": event.event_id,
+ "user_id": event.target_user_id,
+ "sender": event.user_id,
+ "room_id": event.room_id,
+ "membership": event.membership,
+ }
+ )
+
+ # Update room hosts table
+ if event.membership == Membership.JOIN:
+ sql = (
+ "INSERT OR IGNORE INTO room_hosts (room_id, host) "
+ "VALUES (?, ?)"
+ )
+ yield self._execute(None, sql, event.room_id, domain)
+ else:
+ sql = (
+ "DELETE FROM room_hosts WHERE room_id = ? AND host = ?"
+ )
+
+ yield self._execute(None, sql, event.room_id, domain)
+
+ @defer.inlineCallbacks
def get_room_member(self, user_id, room_id):
"""Retrieve the current state of a room member.
@@ -38,36 +70,15 @@ class RoomMemberStore(SQLBaseStore):
user_id (str): The member's user ID.
room_id (str): The room the member is in.
Returns:
- namedtuple: The room member from the database, or None if this
- member does not exist.
+ Deferred: Results in a MembershipEvent or None.
"""
- query = RoomMemberTable.select_statement(
- "room_id = ? AND user_id = ? ORDER BY id DESC LIMIT 1")
- return self._execute(
- RoomMemberTable.decode_single_result,
- query, room_id, user_id,
- )
+ rows = yield self._get_members_by_dict({
+ "e.room_id": room_id,
+ "m.user_id": user_id,
+ })
- def store_room_member(self, user_id, sender, room_id, membership, content):
- """Store a room member in the database.
+ defer.returnValue(rows[0] if rows else None)
- Args:
- user_id (str): The member's user ID.
- room_id (str): The room in relation to the member.
- membership (synapse.api.constants.Membership): The new membership
- state.
- content (dict): The content of the membership (JSON).
- """
- content_json = json.dumps(content)
- return self._simple_insert(RoomMemberTable.table_name, dict(
- user_id=user_id,
- sender=sender,
- room_id=room_id,
- membership=membership,
- content=content_json,
- ))
-
- @defer.inlineCallbacks
def get_room_members(self, room_id, membership=None):
"""Retrieve the current room member list for a room.
@@ -79,17 +90,12 @@ class RoomMemberStore(SQLBaseStore):
Returns:
list of namedtuples representing the members in this room.
"""
- query = RoomMemberTable.select_statement(
- "id IN (SELECT MAX(id) FROM " + RoomMemberTable.table_name
- + " WHERE room_id = ? GROUP BY user_id)"
- )
- res = yield self._execute(
- RoomMemberTable.decode_results, query, room_id,
- )
- # strip memberships which don't match
+
+ where = {"m.room_id": room_id}
if membership:
- res = [entry for entry in res if entry.membership == membership]
- defer.returnValue(res)
+ where["m.membership"] = membership
+
+ return self._get_members_by_dict(where)
def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
""" Get all the rooms for this user where the membership for this user
@@ -106,70 +112,40 @@ class RoomMemberStore(SQLBaseStore):
return defer.succeed(None)
args = [user_id]
- membership_placeholder = ["membership=?"] * len(membership_list)
- where_membership = "(" + " OR ".join(membership_placeholder) + ")"
- for membership in membership_list:
- args.append(membership)
-
- # sub-select finds the row ID for the most recent (i.e. current)
- # state change of this user per room, then the outer select finds those
- query = ("SELECT room_id, membership FROM room_memberships"
- + " WHERE id IN (SELECT MAX(id) FROM room_memberships"
- + " WHERE user_id=? GROUP BY room_id)"
- + " AND " + where_membership)
- return self._execute(
- self.cursor_to_dict, query, *args
- )
+ args.extend(membership_list)
- @defer.inlineCallbacks
- def get_joined_hosts_for_room(self, room_id):
- query = RoomMemberTable.select_statement(
- "id IN (SELECT MAX(id) FROM " + RoomMemberTable.table_name
- + " WHERE room_id = ? GROUP BY user_id)"
- )
-
- res = yield self._execute(
- RoomMemberTable.decode_results, query, room_id,
+ where_clause = "user_id = ? AND (%s)" % (
+ " OR ".join(["membership = ?" for _ in membership_list]),
)
- def host_from_user_id_string(user_id):
- domain = UserID.from_string(entry.user_id, self.hs).domain
- return domain
-
- # strip memberships which don't match
- hosts = [
- host_from_user_id_string(entry.user_id)
- for entry in res
- if entry.membership == Membership.JOIN
- ]
+ return self._get_members_query(where_clause, args)
- logger.debug("Returning hosts: %s from results: %s", hosts, res)
-
- defer.returnValue(hosts)
-
- def get_max_room_member_id(self):
- return self._simple_max_id(RoomMemberTable.table_name)
-
-
-class RoomMemberTable(Table):
- table_name = "room_memberships"
-
- fields = [
- "id",
- "user_id",
- "sender",
- "room_id",
- "membership",
- "content"
- ]
+ def get_joined_hosts_for_room(self, room_id):
+ return self._simple_select_onecol(
+ "room_hosts",
+ {"room_id": room_id},
+ "host"
+ )
- class EntryType(collections.namedtuple("RoomMemberEntry", fields)):
+ def _get_members_by_dict(self, where_dict):
+ clause = " AND ".join("%s = ?" % k for k in where_dict.keys())
+ vals = where_dict.values()
+ return self._get_members_query(clause, vals)
- def as_event(self, event_factory):
- return event_factory.create_event(
- etype=RoomMemberEvent.TYPE,
- room_id=self.room_id,
- target_user_id=self.user_id,
- user_id=self.sender,
- content=json.loads(self.content),
- )
+ @defer.inlineCallbacks
+ def _get_members_query(self, where_clause, where_values):
+ sql = (
+ "SELECT e.* FROM events as e "
+ "INNER JOIN room_memberships as m "
+ "ON e.event_id = m.event_id "
+ "INNER JOIN current_state_events as c "
+ "ON m.event_id = c.event_id "
+ "WHERE %s "
+ ) % (where_clause,)
+
+ rows = yield self._execute_and_decode(sql, *where_values)
+
+ logger.debug("_get_members_query Got rows %s", rows)
+
+ results = [self._parse_event_from_row(r) for r in rows]
+ defer.returnValue(results)
diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql
index 77096546b2..ea04261ff0 100644
--- a/synapse/storage/schema/im.sql
+++ b/synapse/storage/schema/im.sql
@@ -12,43 +12,70 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-CREATE TABLE IF NOT EXISTS rooms(
- room_id TEXT PRIMARY KEY NOT NULL,
- is_public INTEGER,
- creator TEXT
+
+CREATE TABLE IF NOT EXISTS events(
+ stream_ordering INTEGER PRIMARY KEY AUTOINCREMENT,
+ topological_ordering INTEGER NOT NULL,
+ event_id TEXT NOT NULL,
+ type TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ content TEXT NOT NULL,
+ unrecognized_keys TEXT,
+ processed BOOL NOT NULL,
+ CONSTRAINT ev_uniq UNIQUE (event_id)
);
-CREATE TABLE IF NOT EXISTS room_memberships(
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- user_id TEXT NOT NULL, -- no foreign key to users table, it could be an id belonging to another home server
- sender TEXT NOT NULL,
+CREATE TABLE IF NOT EXISTS state_events(
+ event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
- membership TEXT NOT NULL,
- content TEXT NOT NULL
+ type TEXT NOT NULL,
+ state_key TEXT NOT NULL,
+ prev_state TEXT
);
-CREATE TABLE IF NOT EXISTS messages(
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- user_id TEXT,
- room_id TEXT,
- msg_id TEXT,
- content TEXT
+CREATE TABLE IF NOT EXISTS current_state_events(
+ event_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ type TEXT NOT NULL,
+ state_key TEXT NOT NULL,
+ CONSTRAINT curr_uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE
+);
+
+CREATE TABLE IF NOT EXISTS room_memberships(
+ event_id TEXT NOT NULL,
+ user_id TEXT NOT NULL,
+ sender TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ membership TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS feedback(
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- content TEXT,
+ event_id TEXT NOT NULL,
feedback_type TEXT,
- fb_sender_id TEXT,
- msg_id TEXT,
- room_id TEXT,
- msg_sender_id TEXT
+ target_event_id TEXT,
+ sender TEXT,
+ room_id TEXT
);
-CREATE TABLE IF NOT EXISTS room_data(
- id INTEGER PRIMARY KEY AUTOINCREMENT,
+CREATE TABLE IF NOT EXISTS topics(
+ event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
- type TEXT NOT NULL,
- state_key TEXT NOT NULL,
- content TEXT
+ topic TEXT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS room_names(
+ event_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ name TEXT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS rooms(
+ room_id TEXT PRIMARY KEY NOT NULL,
+ is_public INTEGER,
+ creator TEXT
+);
+
+CREATE TABLE IF NOT EXISTS room_hosts(
+ room_id TEXT NOT NULL,
+ host TEXT NOT NULL
);
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 47a1f2c45a..3a67baa261 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -13,267 +13,282 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+""" This module is responsible for getting events from the DB for pagination
+and event streaming.
+
+The order it returns events in depend on whether we are streaming forwards or
+are paginating backwards. We do this because we want to handle out of order
+messages nicely, while still returning them in the correct order when we
+paginate bacwards.
+
+This is implemented by keeping two ordering columns: stream_ordering and
+topological_ordering. Stream ordering is basically insertion/received order
+(except for events from backfill requests). The topolgical_ordering is a
+weak ordering of events based on the pdu graph.
+
+This means that we have to have two different types of tokens, depending on
+what sort order was used:
+ - stream tokens are of the form: "s%d", which maps directly to the column
+ - topological tokems: "t%d-%d", where the integers map to the topological
+ and stream ordering columns respectively.
+"""
+
+from twisted.internet import defer
from ._base import SQLBaseStore
-from .message import MessagesTable
-from .feedback import FeedbackTable
-from .roomdata import RoomDataTable
-from .roommember import RoomMemberTable
+from synapse.api.errors import SynapseError
+from synapse.api.constants import Membership
+from synapse.util.logutils import log_function
import json
import logging
+
logger = logging.getLogger(__name__)
+MAX_STREAM_SIZE = 1000
+
+
+_STREAM_TOKEN = "stream"
+_TOPOLOGICAL_TOKEN = "topological"
+
+
+def _parse_stream_token(string):
+ try:
+ if string[0] != 's':
+ raise
+ return int(string[1:])
+ except:
+ raise SynapseError(400, "Invalid token")
+
+
+def _parse_topological_token(string):
+ try:
+ if string[0] != 't':
+ raise
+ parts = string[1:].split('-', 1)
+ return (int(parts[0]), int(parts[1]))
+ except:
+ raise SynapseError(400, "Invalid token")
+
+
+def is_stream_token(string):
+ try:
+ _parse_stream_token(string)
+ return True
+ except:
+ return False
+
+
+def is_topological_token(string):
+ try:
+ _parse_topological_token(string)
+ return True
+ except:
+ return False
+
+
+def _get_token_bound(token, comparison):
+ try:
+ s = _parse_stream_token(token)
+ return "%s %s %d" % ("stream_ordering", comparison, s)
+ except:
+ pass
+
+ try:
+ top, stream = _parse_topological_token(token)
+ return "%s %s %d AND %s %s %d" % (
+ "topological_ordering", comparison, top,
+ "stream_ordering", comparison, stream,
+ )
+ except:
+ pass
+
+ raise SynapseError(400, "Invalid token")
+
+
class StreamStore(SQLBaseStore):
+ @log_function
+ def get_room_events(self, user_id, from_key, to_key, room_id, limit=0,
+ direction='f', with_feedback=False):
+ is_events = (
+ direction == 'f'
+ and is_stream_token(from_key)
+ and to_key and is_stream_token(to_key)
+ )
- def get_message_stream(self, user_id, from_key, to_key, room_id, limit=0,
- with_feedback=False):
- """Get all messages for this user between the given keys.
-
- Args:
- user_id (str): The user who is requesting messages.
- from_key (int): The ID to start returning results from (exclusive).
- to_key (int): The ID to stop returning results (exclusive).
- room_id (str): Gets messages only for this room. Can be None, in
- which case all room messages will be returned.
- Returns:
- A tuple of rows (list of namedtuples), new_id(int)
- """
- if with_feedback and room_id: # with fb MUST specify a room ID
- return self._db_pool.runInteraction(
- self._get_message_rows_with_feedback,
- user_id, from_key, to_key, room_id, limit
+ if is_events:
+ return self.get_room_events_stream(
+ user_id=user_id,
+ from_key=from_key,
+ to_key=to_key,
+ room_id=room_id,
+ limit=limit,
+ with_feedback=with_feedback,
)
else:
- return self._db_pool.runInteraction(
- self._get_message_rows,
- user_id, from_key, to_key, room_id, limit
+ return self.paginate_room_events(
+ from_key=from_key,
+ to_key=to_key,
+ room_id=room_id,
+ limit=limit,
+ with_feedback=with_feedback,
)
- def _get_message_rows(self, txn, user_id, from_pkey, to_pkey, room_id,
- limit):
- # work out which rooms this user is joined in on and join them with
- # the room id on the messages table, bounded by the specified pkeys
+ @defer.inlineCallbacks
+ @log_function
+ def get_room_events_stream(self, user_id, from_key, to_key, room_id,
+ limit=0, with_feedback=False):
+ # TODO (erikj): Handle compressed feedback
- # get all messages where the *current* membership state is 'join' for
- # this user in that room.
- query = ("SELECT messages.* FROM messages WHERE ? IN"
- + " (SELECT membership from room_memberships WHERE user_id=?"
- + " AND room_id = messages.room_id ORDER BY id DESC LIMIT 1)")
- query_args = ["join", user_id]
+ current_room_membership_sql = (
+ "SELECT m.room_id FROM room_memberships as m "
+ "INNER JOIN current_state_events as c ON m.event_id = c.event_id "
+ "WHERE m.user_id = ?"
+ )
- if room_id:
- query += " AND messages.room_id=?"
- query_args.append(room_id)
+ invites_sql = (
+ "SELECT m.event_id FROM room_memberships as m "
+ "INNER JOIN current_state_events as c ON m.event_id = c.event_id "
+ "WHERE m.user_id = ? AND m.membership = ?"
+ )
- (query, query_args) = self._append_stream_operations(
- "messages", query, query_args, from_pkey, to_pkey, limit=limit
+ if limit:
+ limit = max(limit, MAX_STREAM_SIZE)
+ else:
+ limit = MAX_STREAM_SIZE
+
+ # From and to keys should be integers from ordering.
+ from_id = _parse_stream_token(from_key)
+ to_id = _parse_stream_token(to_key)
+
+ if from_key == to_key:
+ defer.returnValue(([], to_key))
+ return
+
+ sql = (
+ "SELECT * FROM events as e WHERE "
+ "((room_id IN (%(current)s)) OR "
+ "(event_id IN (%(invites)s))) "
+ "AND e.stream_ordering > ? AND e.stream_ordering < ? "
+ "ORDER BY stream_ordering ASC LIMIT %(limit)d "
+ ) % {
+ "current": current_room_membership_sql,
+ "invites": invites_sql,
+ "limit": limit
+ }
+
+ rows = yield self._execute_and_decode(
+ sql,
+ user_id, user_id, Membership.INVITE, from_id, to_id
)
- cursor = txn.execute(query, query_args)
- return self._as_events(cursor, MessagesTable, from_pkey)
+ ret = [self._parse_event_from_row(r) for r in rows]
- def _get_message_rows_with_feedback(self, txn, user_id, from_pkey, to_pkey,
- room_id, limit):
- # this col represents the compressed feedback JSON as per spec
- compressed_feedback_col = (
- "'[' || group_concat('{\"sender_id\":\"' || f.fb_sender_id"
- + " || '\",\"feedback_type\":\"' || f.feedback_type"
- + " || '\",\"content\":' || f.content || '}') || ']'"
- )
+ if rows:
+ key = "s%d" % max([r["stream_ordering"] for r in rows])
+ else:
+ # Assume we didn't get anything because there was nothing to get.
+ key = to_key
+
+ defer.returnValue((ret, key))
- global_msg_id_join = ("f.room_id = messages.room_id"
- + " and f.msg_id = messages.msg_id"
- + " and messages.user_id = f.msg_sender_id")
+ @defer.inlineCallbacks
+ @log_function
+ def paginate_room_events(self, room_id, from_key, to_key=None,
+ direction='b', limit=-1,
+ with_feedback=False):
+ # TODO (erikj): Handle compressed feedback
- select_query = (
- "SELECT messages.*, f.content AS fb_content, f.fb_sender_id"
- + ", " + compressed_feedback_col + " AS compressed_fb"
- + " FROM messages LEFT JOIN feedback f ON " + global_msg_id_join)
+ from_comp = '<' if direction =='b' else '>'
+ to_comp = '>' if direction =='b' else '<'
+ order = "DESC" if direction == 'b' else "ASC"
- current_membership_sub_query = (
- "(SELECT membership from room_memberships rm"
- + " WHERE user_id=? AND room_id = rm.room_id"
- + " ORDER BY id DESC LIMIT 1)")
+ args = [room_id]
- where = (" WHERE ? IN " + current_membership_sub_query
- + " AND messages.room_id=?")
+ bounds = _get_token_bound(from_key, from_comp)
+ if to_key:
+ bounds = "%s AND %s" % (bounds, _get_token_bound(to_key, to_comp))
- query = select_query + where
- query_args = ["join", user_id, room_id]
+ if int(limit) > 0:
+ args.append(int(limit))
+ limit_str = " LIMIT ?"
+ else:
+ limit_str = ""
- (query, query_args) = self._append_stream_operations(
- "messages", query, query_args, from_pkey, to_pkey,
- limit=limit, group_by=" GROUP BY messages.id "
+ sql = (
+ "SELECT * FROM events "
+ "WHERE room_id = ? AND %(bounds)s "
+ "ORDER BY topological_ordering %(order)s, stream_ordering %(order)s %(limit)s "
+ ) % {"bounds": bounds, "order": order, "limit": limit_str}
+
+ rows = yield self._execute_and_decode(
+ sql,
+ *args
)
- cursor = txn.execute(query, query_args)
-
- # convert the result set into events
- entries = self.cursor_to_dict(cursor)
- events = []
- for entry in entries:
- # TODO we should spec the cursor > event mapping somewhere else.
- event = {}
- straight_mappings = ["msg_id", "user_id", "room_id"]
- for key in straight_mappings:
- event[key] = entry[key]
- event["content"] = json.loads(entry["content"])
- if entry["compressed_fb"]:
- event["feedback"] = json.loads(entry["compressed_fb"])
- events.append(event)
-
- latest_pkey = from_pkey if len(entries) == 0 else entries[-1]["id"]
-
- return (events, latest_pkey)
-
- def get_room_member_stream(self, user_id, from_key, to_key):
- """Get all room membership events for this user between the given keys.
-
- Args:
- user_id (str): The user who is requesting membership events.
- from_key (int): The ID to start returning results from (exclusive).
- to_key (int): The ID to stop returning results (exclusive).
- Returns:
- A tuple of rows (list of namedtuples), new_id(int)
- """
- return self._db_pool.runInteraction(
- self._get_room_member_rows, user_id, from_key, to_key
+ if rows:
+ topo = rows[-1]["topological_ordering"]
+ toke = rows[-1]["stream_ordering"]
+ next_token = "t%s-%s" % (topo, toke)
+ else:
+ # TODO (erikj): We should work out what to do here instead.
+ next_token = to_key if to_key else from_key
+
+ defer.returnValue(
+ (
+ [self._parse_event_from_row(r) for r in rows],
+ next_token
+ )
)
- def _get_room_member_rows(self, txn, user_id, from_pkey, to_pkey):
- # get all room membership events for rooms which the user is
- # *currently* joined in on, or all invite events for this user.
- current_membership_sub_query = (
- "(SELECT membership FROM room_memberships"
- + " WHERE user_id=? AND room_id = rm.room_id"
- + " ORDER BY id DESC LIMIT 1)")
-
- query = ("SELECT rm.* FROM room_memberships rm "
- # all membership events for rooms you've currently joined.
- + " WHERE (? IN " + current_membership_sub_query
- # all invite membership events for this user
- + " OR rm.membership=? AND user_id=?)"
- + " AND rm.id > ?")
- query_args = ["join", user_id, "invite", user_id, from_pkey]
-
- if to_pkey != -1:
- query += " AND rm.id < ?"
- query_args.append(to_pkey)
-
- cursor = txn.execute(query, query_args)
- return self._as_events(cursor, RoomMemberTable, from_pkey)
-
- def get_feedback_stream(self, user_id, from_key, to_key, room_id, limit=0):
- return self._db_pool.runInteraction(
- self._get_feedback_rows,
- user_id, from_key, to_key, room_id, limit
+ @defer.inlineCallbacks
+ def get_recent_events_for_room(self, room_id, limit, with_feedback=False):
+ # TODO (erikj): Handle compressed feedback
+
+ end_token = yield self.get_room_events_max_id()
+
+ sql = (
+ "SELECT * FROM events "
+ "WHERE room_id = ? AND stream_ordering <= ? "
+ "ORDER BY topological_ordering, stream_ordering DESC LIMIT ? "
)
- def _get_feedback_rows(self, txn, user_id, from_pkey, to_pkey, room_id,
- limit):
- # work out which rooms this user is joined in on and join them with
- # the room id on the feedback table, bounded by the specified pkeys
-
- # get all messages where the *current* membership state is 'join' for
- # this user in that room.
- query = (
- "SELECT feedback.* FROM feedback WHERE ? IN "
- + "(SELECT membership from room_memberships WHERE user_id=?"
- + " AND room_id = feedback.room_id ORDER BY id DESC LIMIT 1)")
- query_args = ["join", user_id]
-
- if room_id:
- query += " AND feedback.room_id=?"
- query_args.append(room_id)
-
- (query, query_args) = self._append_stream_operations(
- "feedback", query, query_args, from_pkey, to_pkey, limit=limit
+ rows = yield self._execute_and_decode(
+ sql,
+ room_id, end_token, limit
)
- cursor = txn.execute(query, query_args)
- return self._as_events(cursor, FeedbackTable, from_pkey)
+ rows.reverse() # As we selected with reverse ordering
+
+ if rows:
+ topo = rows[0]["topological_ordering"]
+ toke = rows[0]["stream_ordering"]
+ start_token = "p%s-%s" % (topo, toke)
- def get_room_data_stream(self, user_id, from_key, to_key, room_id,
- limit=0):
- return self._db_pool.runInteraction(
- self._get_room_data_rows,
- user_id, from_key, to_key, room_id, limit
+ token = (start_token, end_token)
+ else:
+ token = (end_token, end_token)
+
+ defer.returnValue(
+ (
+ [self._parse_event_from_row(r) for r in rows],
+ token
+ )
)
- def _get_room_data_rows(self, txn, user_id, from_pkey, to_pkey, room_id,
- limit):
- # work out which rooms this user is joined in on and join them with
- # the room id on the feedback table, bounded by the specified pkeys
-
- # get all messages where the *current* membership state is 'join' for
- # this user in that room.
- query = (
- "SELECT room_data.* FROM room_data WHERE ? IN "
- + "(SELECT membership from room_memberships WHERE user_id=?"
- + " AND room_id = room_data.room_id ORDER BY id DESC LIMIT 1)")
- query_args = ["join", user_id]
-
- if room_id:
- query += " AND room_data.room_id=?"
- query_args.append(room_id)
-
- (query, query_args) = self._append_stream_operations(
- "room_data", query, query_args, from_pkey, to_pkey, limit=limit
+ @defer.inlineCallbacks
+ def get_room_events_max_id(self):
+ res = yield self._execute_and_decode(
+ "SELECT MAX(stream_ordering) as m FROM events"
)
- cursor = txn.execute(query, query_args)
- return self._as_events(cursor, RoomDataTable, from_pkey)
-
- def _append_stream_operations(self, table_name, query, query_args,
- from_pkey, to_pkey, limit=None,
- group_by=""):
- LATEST_ROW = -1
- order_by = ""
- if to_pkey > from_pkey:
- if from_pkey != LATEST_ROW:
- # e.g. from=5 to=9 >> from 5 to 9 >> id>5 AND id<9
- query += (" AND %s.id > ? AND %s.id < ?" %
- (table_name, table_name))
- query_args.append(from_pkey)
- query_args.append(to_pkey)
- else:
- # e.g. from=-1 to=5 >> from now to 5 >> id>5 ORDER BY id DESC
- query += " AND %s.id > ? " % table_name
- order_by = "ORDER BY id DESC"
- query_args.append(to_pkey)
- elif from_pkey > to_pkey:
- if to_pkey != LATEST_ROW:
- # from=9 to=5 >> from 9 to 5 >> id>5 AND id<9 ORDER BY id DESC
- query += (" AND %s.id > ? AND %s.id < ? " %
- (table_name, table_name))
- order_by = "ORDER BY id DESC"
- query_args.append(to_pkey)
- query_args.append(from_pkey)
- else:
- # from=5 to=-1 >> from 5 to now >> id>5
- query += " AND %s.id > ?" % table_name
- query_args.append(from_pkey)
-
- query += group_by + order_by
-
- if limit and limit > 0:
- query += " LIMIT ?"
- query_args.append(str(limit))
-
- return (query, query_args)
-
- def _as_events(self, cursor, table, from_pkey):
- data_entries = table.decode_results(cursor)
- last_pkey = from_pkey
- if data_entries:
- last_pkey = data_entries[-1].id
-
- events = [
- entry.as_event(self.event_factory).get_dict()
- for entry in data_entries
- ]
-
- return (events, last_pkey)
+ logger.debug("get_room_events_max_id: %s", res)
+
+ if not res or not res[0] or not res[0]["m"]:
+ defer.returnValue("s1")
+ return
+
+ key = res[0]["m"] + 1
+ defer.returnValue("s%d" % (key,))
|