diff --git a/synapse/__init__.py b/synapse/__init__.py
index 1e7b2ab272..47fc1b2ea4 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -15,3 +15,5 @@
""" This is a reference implementation of a synapse home server.
"""
+
+__version__ = "0.0.1"
diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py
index 921fd08832..aa04dbece7 100644
--- a/synapse/api/events/__init__.py
+++ b/synapse/api/events/__init__.py
@@ -51,6 +51,7 @@ class SynapseEvent(JsonEncodedObject):
"depth",
"destinations",
"origin",
+ "outlier",
]
required_keys = [
diff --git a/synapse/api/events/factory.py b/synapse/api/events/factory.py
index 12aa04fc6e..c2cdcddf41 100644
--- a/synapse/api/events/factory.py
+++ b/synapse/api/events/factory.py
@@ -15,7 +15,7 @@
from synapse.api.events.room import (
RoomTopicEvent, MessageEvent, RoomMemberEvent, FeedbackEvent,
- InviteJoinEvent, RoomConfigEvent
+ InviteJoinEvent, RoomConfigEvent, RoomNameEvent, GenericEvent,
)
from synapse.util.stringutils import random_string
@@ -25,6 +25,7 @@ class EventFactory(object):
_event_classes = [
RoomTopicEvent,
+ RoomNameEvent,
MessageEvent,
RoomMemberEvent,
FeedbackEvent,
@@ -32,20 +33,24 @@ class EventFactory(object):
RoomConfigEvent
]
- def __init__(self):
+ def __init__(self, hs):
self._event_list = {} # dict of TYPE to event class
for event_class in EventFactory._event_classes:
self._event_list[event_class.TYPE] = event_class
+ self.clock = hs.get_clock()
+
def create_event(self, etype=None, **kwargs):
kwargs["type"] = etype
if "event_id" not in kwargs:
kwargs["event_id"] = random_string(10)
- try:
+ if "ts" not in kwargs:
+ kwargs["ts"] = int(self.clock.time_msec())
+
+ if etype in self._event_list:
handler = self._event_list[etype]
- except KeyError: # unknown event type
- # TODO allow custom event types.
- raise NotImplementedError("Unknown etype=%s" % etype)
+ else:
+ handler = GenericEvent
return handler(**kwargs)
diff --git a/synapse/api/events/room.py b/synapse/api/events/room.py
index f3df849af2..42459f3f21 100644
--- a/synapse/api/events/room.py
+++ b/synapse/api/events/room.py
@@ -16,17 +16,45 @@
from . import SynapseEvent
+class GenericEvent(SynapseEvent):
+ def get_content_template(self):
+ return {}
+
+
class RoomTopicEvent(SynapseEvent):
TYPE = "m.room.topic"
+ internal_keys = SynapseEvent.internal_keys + [
+ "topic",
+ ]
+
def __init__(self, **kwargs):
kwargs["state_key"] = ""
+ if "topic" in kwargs["content"]:
+ kwargs["topic"] = kwargs["content"]["topic"]
super(RoomTopicEvent, self).__init__(**kwargs)
def get_content_template(self):
return {"topic": u"string"}
+class RoomNameEvent(SynapseEvent):
+ TYPE = "m.room.name"
+
+ internal_keys = SynapseEvent.internal_keys + [
+ "name",
+ ]
+
+ def __init__(self, **kwargs):
+ kwargs["state_key"] = ""
+ if "name" in kwargs["content"]:
+ kwargs["name"] = kwargs["content"]["name"]
+ super(RoomNameEvent, self).__init__(**kwargs)
+
+ def get_content_template(self):
+ return {"name": u"string"}
+
+
class RoomMemberEvent(SynapseEvent):
TYPE = "m.room.member"
@@ -38,6 +66,8 @@ class RoomMemberEvent(SynapseEvent):
def __init__(self, **kwargs):
if "target_user_id" in kwargs:
kwargs["state_key"] = kwargs["target_user_id"]
+ if "membership" not in kwargs:
+ kwargs["membership"] = kwargs.get("content", {}).get("membership")
super(RoomMemberEvent, self).__init__(**kwargs)
def get_content_template(self):
diff --git a/synapse/api/notifier.py b/synapse/api/notifier.py
index 65b5a4ebb3..9f622df6bb 100644
--- a/synapse/api/notifier.py
+++ b/synapse/api/notifier.py
@@ -15,6 +15,7 @@
from synapse.api.constants import Membership
from synapse.api.events.room import RoomMemberEvent
+from synapse.api.streams.event import EventsStreamData
from twisted.internet import defer
from twisted.internet import reactor
@@ -66,7 +67,7 @@ class Notifier(object):
self._notify_and_callback(
user_id=user_id,
event_data=event.get_dict(),
- stream_type=event.type,
+ stream_type=EventsStreamData.EVENT_TYPE,
store_id=store_id)
def on_new_user_event(self, user_id, event_data, stream_type, store_id):
diff --git a/synapse/api/streams/__init__.py b/synapse/api/streams/__init__.py
index 989e63f9ec..d831eafbab 100644
--- a/synapse/api/streams/__init__.py
+++ b/synapse/api/streams/__init__.py
@@ -20,23 +20,24 @@ class PaginationConfig(object):
"""A configuration object which stores pagination parameters."""
- def __init__(self, from_tok=None, to_tok=None, limit=0):
+ def __init__(self, from_tok=None, to_tok=None, direction='f', limit=0):
self.from_tok = from_tok
self.to_tok = to_tok
+ self.direction = direction
self.limit = limit
@classmethod
def from_request(cls, request, raise_invalid_params=True):
params = {
- "from_tok": PaginationStream.TOK_START,
- "to_tok": PaginationStream.TOK_END,
- "limit": 0
+ "from_tok": "END",
+ "direction": 'f',
}
query_param_mappings = [ # 3-tuple of qp_key, attribute, rules
("from", "from_tok", lambda x: type(x) == str),
("to", "to_tok", lambda x: type(x) == str),
- ("limit", "limit", lambda x: x.isdigit())
+ ("limit", "limit", lambda x: x.isdigit()),
+ ("dir", "direction", lambda x: x == 'f' or x == 'b'),
]
for qp, attr, is_valid in query_param_mappings:
@@ -48,12 +49,17 @@ class PaginationConfig(object):
return PaginationConfig(**params)
+ def __str__(self):
+ return (
+ "<PaginationConfig from_tok=%s, to_tok=%s, "
+ "direction=%s, limit=%s>"
+ ) % (self.from_tok, self.to_tok, self.direction, self.limit)
+
class PaginationStream(object):
""" An interface for streaming data as chunks. """
- TOK_START = "START"
TOK_END = "END"
def get_chunk(self, config=None):
@@ -76,7 +82,7 @@ class StreamData(object):
self.hs = hs
self.store = hs.get_datastore()
- def get_rows(self, user_id, from_pkey, to_pkey, limit):
+ def get_rows(self, user_id, from_pkey, to_pkey, limit, direction):
""" Get event stream data between the specified pkeys.
Args:
diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py
index 4b6d739e54..a5c8b2b31f 100644
--- a/synapse/api/streams/event.py
+++ b/synapse/api/streams/event.py
@@ -18,6 +18,7 @@
from twisted.internet import defer
from synapse.api.errors import EventStreamError
+from synapse.api.events import SynapseEvent
from synapse.api.events.room import (
RoomMemberEvent, MessageEvent, FeedbackEvent, RoomTopicEvent
)
@@ -28,17 +29,17 @@ import logging
logger = logging.getLogger(__name__)
-class MessagesStreamData(StreamData):
- EVENT_TYPE = MessageEvent.TYPE
+class EventsStreamData(StreamData):
+ EVENT_TYPE = "EventsStream"
def __init__(self, hs, room_id=None, feedback=False):
- super(MessagesStreamData, self).__init__(hs)
+ super(EventsStreamData, self).__init__(hs)
self.room_id = room_id
self.with_feedback = feedback
@defer.inlineCallbacks
- def get_rows(self, user_id, from_key, to_key, limit):
- (data, latest_ver) = yield self.store.get_message_stream(
+ def get_rows(self, user_id, from_key, to_key, limit, direction):
+ data, latest_ver = yield self.store.get_room_events(
user_id=user_id,
from_key=from_key,
to_key=to_key,
@@ -50,74 +51,7 @@ class MessagesStreamData(StreamData):
@defer.inlineCallbacks
def max_token(self):
- val = yield self.store.get_max_message_id()
- defer.returnValue(val)
-
-
-class RoomMemberStreamData(StreamData):
- EVENT_TYPE = RoomMemberEvent.TYPE
-
- @defer.inlineCallbacks
- def get_rows(self, user_id, from_key, to_key, limit):
- (data, latest_ver) = yield self.store.get_room_member_stream(
- user_id=user_id,
- from_key=from_key,
- to_key=to_key
- )
-
- defer.returnValue((data, latest_ver))
-
- @defer.inlineCallbacks
- def max_token(self):
- val = yield self.store.get_max_room_member_id()
- defer.returnValue(val)
-
-
-class FeedbackStreamData(StreamData):
- EVENT_TYPE = FeedbackEvent.TYPE
-
- def __init__(self, hs, room_id=None):
- super(FeedbackStreamData, self).__init__(hs)
- self.room_id = room_id
-
- @defer.inlineCallbacks
- def get_rows(self, user_id, from_key, to_key, limit):
- (data, latest_ver) = yield self.store.get_feedback_stream(
- user_id=user_id,
- from_key=from_key,
- to_key=to_key,
- limit=limit,
- room_id=self.room_id
- )
- defer.returnValue((data, latest_ver))
-
- @defer.inlineCallbacks
- def max_token(self):
- val = yield self.store.get_max_feedback_id()
- defer.returnValue(val)
-
-
-class RoomDataStreamData(StreamData):
- EVENT_TYPE = RoomTopicEvent.TYPE # TODO need multiple event types
-
- def __init__(self, hs, room_id=None):
- super(RoomDataStreamData, self).__init__(hs)
- self.room_id = room_id
-
- @defer.inlineCallbacks
- def get_rows(self, user_id, from_key, to_key, limit):
- (data, latest_ver) = yield self.store.get_room_data_stream(
- user_id=user_id,
- from_key=from_key,
- to_key=to_key,
- limit=limit,
- room_id=self.room_id
- )
- defer.returnValue((data, latest_ver))
-
- @defer.inlineCallbacks
- def max_token(self):
- val = yield self.store.get_max_room_data_id()
+ val = yield self.store.get_room_events_max_id()
defer.returnValue(val)
@@ -136,6 +70,15 @@ class EventStream(PaginationStream):
pagination_config.from_tok)
pagination_config.to_tok = yield self.fix_token(
pagination_config.to_tok)
+
+ if (
+ not pagination_config.to_tok
+ and pagination_config.direction == 'f'
+ ):
+ pagination_config.to_tok = yield self.get_current_max_token()
+
+ logger.debug("pagination_config: %s", pagination_config)
+
defer.returnValue(pagination_config)
@defer.inlineCallbacks
@@ -147,39 +90,42 @@ class EventStream(PaginationStream):
Returns:
The fixed-up token, which may == token.
"""
- # replace TOK_START and TOK_END with 0_0_0 or -1_-1_-1 depending.
- replacements = [
- (PaginationStream.TOK_START, "0"),
- (PaginationStream.TOK_END, "-1")
- ]
- for magic_token, key in replacements:
- if magic_token == token:
- token = EventStream.SEPARATOR.join(
- [key] * len(self.stream_data)
- )
-
- # replace -1 values with an actual pkey
- token_segments = self._split_token(token)
- for i, tok in enumerate(token_segments):
- if tok == -1:
- # add 1 to the max token because results are EXCLUSIVE from the
- # latest version.
- token_segments[i] = 1 + (yield self.stream_data[i].max_token())
- defer.returnValue(EventStream.SEPARATOR.join(
- str(x) for x in token_segments
- ))
+ if token == PaginationStream.TOK_END:
+ new_token = yield self.get_current_max_token()
+
+ logger.debug("fix_token: From %s to %s", token, new_token)
+
+ token = new_token
+
+ defer.returnValue(token)
@defer.inlineCallbacks
- def get_chunk(self, config=None):
+ def get_current_max_token(self):
+ new_token_parts = []
+ for s in self.stream_data:
+ mx = yield s.max_token()
+ new_token_parts.append(str(mx))
+
+ new_token = EventStream.SEPARATOR.join(new_token_parts)
+
+ logger.debug("get_current_max_token: %s", new_token)
+
+ defer.returnValue(new_token)
+
+ @defer.inlineCallbacks
+ def get_chunk(self, config):
# no support for limit on >1 streams, makes no sense.
if config.limit and len(self.stream_data) > 1:
raise EventStreamError(
400, "Limit not supported on multiplexed streams."
)
- (chunk_data, next_tok) = yield self._get_chunk_data(config.from_tok,
- config.to_tok,
- config.limit)
+ chunk_data, next_tok = yield self._get_chunk_data(
+ config.from_tok,
+ config.to_tok,
+ config.limit,
+ config.direction,
+ )
defer.returnValue({
"chunk": chunk_data,
@@ -188,7 +134,7 @@ class EventStream(PaginationStream):
})
@defer.inlineCallbacks
- def _get_chunk_data(self, from_tok, to_tok, limit):
+ def _get_chunk_data(self, from_tok, to_tok, limit, direction):
""" Get event data between the two tokens.
Tokens are SEPARATOR separated values representing pkey values of
@@ -206,11 +152,12 @@ class EventStream(PaginationStream):
EventStreamError if something went wrong.
"""
# sanity check
- if (from_tok.count(EventStream.SEPARATOR) !=
- to_tok.count(EventStream.SEPARATOR) or
- (from_tok.count(EventStream.SEPARATOR) + 1) !=
- len(self.stream_data)):
- raise EventStreamError(400, "Token lengths don't match.")
+ if to_tok is not None:
+ if (from_tok.count(EventStream.SEPARATOR) !=
+ to_tok.count(EventStream.SEPARATOR) or
+ (from_tok.count(EventStream.SEPARATOR) + 1) !=
+ len(self.stream_data)):
+ raise EventStreamError(400, "Token lengths don't match.")
chunk = []
next_ver = []
@@ -224,10 +171,13 @@ class EventStream(PaginationStream):
continue
(event_chunk, max_pkey) = yield self.stream_data[i].get_rows(
- self.user_id, from_pkey, to_pkey, limit
+ self.user_id, from_pkey, to_pkey, limit, direction,
)
- chunk += event_chunk
+ chunk.extend([
+ e.get_dict() if isinstance(e, SynapseEvent) else e
+ for e in event_chunk
+ ])
next_ver.append(str(max_pkey))
defer.returnValue((chunk, EventStream.SEPARATOR.join(next_ver)))
@@ -240,9 +190,8 @@ class EventStream(PaginationStream):
Returns:
A list of ints.
"""
- segments = token.split(EventStream.SEPARATOR)
- try:
- int_segments = [int(x) for x in segments]
- except ValueError:
- raise EventStreamError(400, "Bad token: %s" % token)
- return int_segments
+ if token:
+ segments = token.split(EventStream.SEPARATOR)
+ else:
+ segments = [None] * len(self.stream_data)
+ return segments
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index ca102236cf..495149466c 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -56,7 +56,7 @@ class SynapseHomeServer(HomeServer):
return File("webclient") # TODO configurable?
def build_resource_for_content_repo(self):
- return ContentRepoResource("uploads", self.auth)
+ return ContentRepoResource(self, self.upload_dir, self.auth)
def build_db_pool(self):
""" Set up all the dbs. Since all the *.sql have IF NOT EXISTS, so we
@@ -235,8 +235,8 @@ def setup():
parser.add_argument('--pid-file', dest="pid", help="When running as a "
"daemon, the file to store the pid in",
default="hs.pid")
- parser.add_argument("-w", "--webclient", dest="webclient",
- action="store_true", help="Host the web client.")
+ parser.add_argument("-W", "--webclient", dest="webclient", default=True,
+ action="store_false", help="Don't host a web client.")
args = parser.parse_args()
verbosity = int(args.verbose) if args.verbose else None
@@ -257,7 +257,8 @@ def setup():
hs = SynapseHomeServer(
args.host,
- db_name=db_name
+ upload_dir=os.path.abspath("uploads"),
+ db_name=db_name,
)
# This object doesn't need to be saved because it's set as the handler for
diff --git a/synapse/federation/handler.py b/synapse/federation/handler.py
index 580e591aca..984c1558e9 100644
--- a/synapse/federation/handler.py
+++ b/synapse/federation/handler.py
@@ -63,7 +63,7 @@ class FederationEventHandler(object):
Deferred: Resolved when it has successfully been queued for
processing.
"""
- yield self._fill_out_prev_events(event)
+ yield self.fill_out_prev_events(event)
pdu = self.pdu_codec.pdu_from_event(event)
@@ -74,10 +74,18 @@ class FederationEventHandler(object):
@log_function
@defer.inlineCallbacks
- def backfill(self, room_id, limit):
- # TODO: Work out which destinations to ask for backfill
- # self.replication_layer.backfill(dest, room_id, limit)
- pass
+ def backfill(self, dest, room_id, limit):
+ pdus = yield self.replication_layer.backfill(dest, room_id, limit)
+
+ if not pdus:
+ defer.returnValue([])
+
+ events = [
+ self.pdu_codec.event_from_pdu(pdu)
+ for pdu in pdus
+ ]
+
+ defer.returnValue(events)
@log_function
def get_state_for_room(self, destination, room_id):
@@ -87,7 +95,7 @@ class FederationEventHandler(object):
@log_function
@defer.inlineCallbacks
- def on_receive_pdu(self, pdu):
+ def on_receive_pdu(self, pdu, backfilled):
""" Called by the ReplicationLayer when we have a new pdu. We need to
do auth checks and put it throught the StateHandler.
"""
@@ -95,7 +103,7 @@ class FederationEventHandler(object):
try:
with (yield self.lock_manager.lock(pdu.context)):
- if event.is_state:
+ if event.is_state and not backfilled:
is_new_state = yield self.state_handler.handle_new_state(
pdu
)
@@ -104,7 +112,7 @@ class FederationEventHandler(object):
else:
is_new_state = False
- yield self.event_handler.on_receive(event, is_new_state)
+ yield self.event_handler.on_receive(event, is_new_state, backfilled)
except AuthError:
# TODO: Implement something in federation that allows us to
@@ -129,7 +137,7 @@ class FederationEventHandler(object):
yield self.event_handler.on_receive(new_state_event)
@defer.inlineCallbacks
- def _fill_out_prev_events(self, event):
+ def fill_out_prev_events(self, event):
if hasattr(event, "prev_events"):
return
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index c9f2e06b7b..8030d0963f 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -209,7 +209,7 @@ class ReplicationLayer(object):
pdus = [Pdu(outlier=False, **p) for p in transaction.pdus]
for pdu in pdus:
- yield self._handle_new_pdu(pdu)
+ yield self._handle_new_pdu(pdu, backfilled=True)
defer.returnValue(pdus)
@@ -416,7 +416,7 @@ class ReplicationLayer(object):
@defer.inlineCallbacks
@log_function
- def _handle_new_pdu(self, pdu):
+ def _handle_new_pdu(self, pdu, backfilled=False):
# We reprocess pdus when we have seen them only as outliers
existing = yield self._get_persisted_pdu(pdu.pdu_id, pdu.origin)
@@ -452,7 +452,10 @@ class ReplicationLayer(object):
# Persist the Pdu, but don't mark it as processed yet.
yield self.pdu_actions.persist_received(pdu)
- ret = yield self.handler.on_receive_pdu(pdu)
+ if not backfilled:
+ ret = yield self.handler.on_receive_pdu(pdu, backfilled=backfilled)
+ else:
+ ret = None
yield self.pdu_actions.mark_as_processed(pdu)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index c2f4685c92..3f07b5aa4a 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -24,4 +24,5 @@ class BaseHandler(object):
self.notifier = hs.get_notifier()
self.room_lock = hs.get_room_lock_manager()
self.state_handler = hs.get_state_handler()
+ self.distributor = hs.get_distributor()
self.hs = hs
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 3af7d824a2..6bb797caf2 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -17,8 +17,7 @@ from twisted.internet import defer
from ._base import BaseHandler
from synapse.api.streams.event import (
- EventStream, MessagesStreamData, RoomMemberStreamData, FeedbackStreamData,
- RoomDataStreamData
+ EventStream, EventsStreamData
)
from synapse.handlers.presence import PresenceStreamData
@@ -26,10 +25,7 @@ from synapse.handlers.presence import PresenceStreamData
class EventStreamHandler(BaseHandler):
stream_data_classes = [
- MessagesStreamData,
- RoomMemberStreamData,
- FeedbackStreamData,
- RoomDataStreamData,
+ EventsStreamData,
PresenceStreamData,
]
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 7026df90a2..9cff444779 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -32,10 +32,19 @@ logger = logging.getLogger(__name__)
class FederationHandler(BaseHandler):
"""Handles events that originated from federation."""
+ def __init__(self, hs):
+ super(FederationHandler, self).__init__(hs)
+
+ self.distributor.observe(
+ "user_joined_room",
+ self._on_user_joined
+ )
+
+ self.waiting_for_join_list = {}
@log_function
@defer.inlineCallbacks
- def on_receive(self, event, is_new_state):
+ def on_receive(self, event, is_new_state, backfilled):
if hasattr(event, "state_key") and not is_new_state:
logger.debug("Ignoring old state.")
return
@@ -70,6 +79,115 @@ class FederationHandler(BaseHandler):
else:
with (yield self.room_lock.lock(event.room_id)):
- store_id = yield self.store.persist_event(event)
+ store_id = yield self.store.persist_event(event, backfilled)
+
+ room = yield self.store.get_room(event.room_id)
+
+ if not room:
+ # Huh, let's try and get the current state
+ try:
+ federation = self.hs.get_federation()
+ yield federation.get_state_for_room(
+ event.origin, event.room_id
+ )
+
+ hosts = yield self.store.get_joined_hosts_for_room(
+ event.room_id
+ )
+ if self.hs.hostname in hosts:
+ try:
+ yield self.store.store_room(
+ event.room_id,
+ "",
+ is_public=False
+ )
+ except:
+ pass
+ except:
+ logger.exception(
+ "Failed to get current state for room %s",
+ event.room_id
+ )
+
+ if not backfilled:
+ yield self.notifier.on_new_room_event(event, store_id)
+
+ if event.type == RoomMemberEvent.TYPE:
+ if event.membership == Membership.JOIN:
+ user = self.hs.parse_userid(event.target_user_id)
+ self.distributor.fire(
+ "user_joined_room", user=user, room_id=event.room_id
+ )
+
+
+ @log_function
+ @defer.inlineCallbacks
+ def backfill(self, dest, room_id, limit):
+ events = yield self.hs.get_federation().backfill(dest, room_id, limit)
+
+ for event in events:
+ try:
+ yield self.store.persist_event(event, backfilled=True)
+ except:
+ logger.exception("Failed to persist event: %s", event)
+
+ defer.returnValue(events)
- yield self.notifier.on_new_room_event(event, store_id)
+ @log_function
+ @defer.inlineCallbacks
+ def do_invite_join(self, target_host, room_id, joinee, content):
+ federation = self.hs.get_federation()
+
+ hosts = yield self.store.get_joined_hosts_for_room(room_id)
+ if self.hs.hostname in hosts:
+ # We are already in the room.
+ logger.debug("We're already in the room apparently")
+ defer.returnValue(False)
+
+ # First get current state to see if we are already joined.
+ try:
+ yield federation.get_state_for_room(target_host, room_id)
+
+ hosts = yield self.store.get_joined_hosts_for_room(room_id)
+ if self.hs.hostname in hosts:
+ # Oh, we were actually in the room already.
+ logger.debug("We're already in the room apparently")
+ defer.returnValue(False)
+ except Exception:
+ logger.exception("Failed to get current state")
+
+ new_event = self.event_factory.create_event(
+ etype=InviteJoinEvent.TYPE,
+ target_host=target_host,
+ room_id=room_id,
+ user_id=joinee,
+ content=content
+ )
+
+ new_event.destinations = [target_host]
+
+ yield federation.handle_new_event(new_event)
+
+ # TODO (erikj): Time out here.
+ d = defer.Deferred()
+ self.waiting_for_join_list.setdefault((joinee, room_id), []).append(d)
+ yield d
+
+ try:
+ yield self.store.store_room(
+ event.room_id,
+ "",
+ is_public=False
+ )
+ except:
+ pass
+
+
+ defer.returnValue(True)
+
+
+ @log_function
+ def _on_user_joined(self, user, room_id):
+ waiters = self.waiting_for_join_list.get((user.to_string(), room_id), [])
+ while waiters:
+ waiters.pop().callback(None)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index a2152c99cf..540e114b82 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -685,7 +685,10 @@ class PresenceStreamData(StreamData):
super(PresenceStreamData, self).__init__(hs)
self.presence = hs.get_handlers().presence_handler
- def get_rows(self, user_id, from_key, to_key, limit):
+ def get_rows(self, user_id, from_key, to_key, limit, direction):
+ from_key = int(from_key)
+ to_key = int(to_key)
+
cachemap = self.presence._user_cachemap
# TODO(paul): limit, and filter by visibility
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 5d0379254b..899b653fb7 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -23,7 +23,8 @@ from synapse.api.events.room import (
RoomTopicEvent, MessageEvent, InviteJoinEvent, RoomMemberEvent,
RoomConfigEvent
)
-from synapse.api.streams.event import EventStream, MessagesStreamData
+from synapse.api.streams.event import EventStream, EventsStreamData
+from synapse.handlers.presence import PresenceStreamData
from synapse.util import stringutils
from ._base import BaseHandler
@@ -59,12 +60,14 @@ class MessageHandler(BaseHandler):
yield self.auth.check_joined_room(room_id, user_id)
# Pull out the message from the db
- msg = yield self.store.get_message(room_id=room_id,
- msg_id=msg_id,
- user_id=sender_id)
+# msg = yield self.store.get_message(
+# room_id=room_id,
+# msg_id=msg_id,
+# user_id=sender_id
+# )
+
+ # TODO (erikj): Once we work out the correct c-s api we need to think on how to do this.
- if msg:
- defer.returnValue(msg)
defer.returnValue(None)
@defer.inlineCallbacks
@@ -114,8 +117,9 @@ class MessageHandler(BaseHandler):
"""
yield self.auth.check_joined_room(room_id, user_id)
- data_source = [MessagesStreamData(self.hs, room_id=room_id,
- feedback=feedback)]
+ data_source = [
+ EventsStreamData(self.hs, room_id=room_id, feedback=feedback)
+ ]
event_stream = EventStream(user_id, data_source)
pagin_config = yield event_stream.fix_tokens(pagin_config)
data_chunk = yield event_stream.get_chunk(config=pagin_config)
@@ -141,12 +145,7 @@ class MessageHandler(BaseHandler):
yield self.state_handler.handle_new_event(event)
# store in db
- store_id = yield self.store.store_room_data(
- room_id=event.room_id,
- etype=event.type,
- state_key=event.state_key,
- content=json.dumps(event.content)
- )
+ store_id = yield self.store.persist_event(event)
event.destinations = yield self.store.get_joined_hosts_for_room(
event.room_id
@@ -201,19 +200,17 @@ class MessageHandler(BaseHandler):
raise RoomError(
403, "Member does not meet private room rules.")
- data = yield self.store.get_room_data(room_id, event_type, state_key)
+ data = yield self.store.get_current_state(
+ room_id, event_type, state_key
+ )
defer.returnValue(data)
@defer.inlineCallbacks
- def get_feedback(self, room_id=None, msg_sender_id=None, msg_id=None,
- user_id=None, fb_sender_id=None, fb_type=None):
- yield self.auth.check_joined_room(room_id, user_id)
+ def get_feedback(self, event_id):
+ # yield self.auth.check_joined_room(room_id, user_id)
# Pull out the feedback from the db
- fb = yield self.store.get_feedback(
- room_id=room_id, msg_id=msg_id, msg_sender_id=msg_sender_id,
- fb_sender_id=fb_sender_id, fb_type=fb_type
- )
+ fb = yield self.store.get_feedback(event_id)
if fb:
defer.returnValue(fb)
@@ -260,20 +257,59 @@ class MessageHandler(BaseHandler):
user_id=user_id,
membership_list=[Membership.INVITE, Membership.JOIN]
)
- for room_info in room_list:
- if room_info["membership"] != Membership.JOIN:
+
+ rooms_ret = []
+
+ now_rooms_token = yield self.store.get_room_events_max_id()
+
+ # FIXME (erikj): Fix this.
+ presence_stream = PresenceStreamData(self.hs)
+ now_presence_token = yield presence_stream.max_token()
+ presence = yield presence_stream.get_rows(
+ user_id, 0, now_presence_token, None, None
+ )
+
+ # FIXME (erikj): We need to not generate this token,
+ now_token = "%s_%s" % (now_rooms_token, now_presence_token)
+
+ for event in room_list:
+ d = {
+ "room_id": event.room_id,
+ "membership": event.membership,
+ }
+
+ if event.membership == Membership.INVITE:
+ d["inviter"] = event.user_id
+
+ rooms_ret.append(d)
+
+ if event.membership != Membership.JOIN:
continue
try:
- event_chunk = yield self.get_messages(
- user_id=user_id,
- pagin_config=pagin_config,
- feedback=feedback,
- room_id=room_info["room_id"]
+ messages, token = yield self.store.get_recent_events_for_room(
+ event.room_id,
+ limit=10,
+ end_token=now_rooms_token,
)
- room_info["messages"] = event_chunk
+
+ d["messages"] = {
+ "chunk": [m.get_dict() for m in messages],
+ "start": token[0],
+ "end": token[1],
+ }
+
+ current_state = yield self.store.get_current_state(event.room_id)
+ d["state"] = [c.get_dict() for c in current_state]
except:
- pass
- defer.returnValue(room_list)
+ logger.exception("Failed to get snapshot")
+
+ user = self.hs.parse_userid(user_id)
+
+ ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token}
+
+ logger.debug("snapshot_all_rooms returning: %s", ret)
+
+ defer.returnValue(ret)
class RoomCreationHandler(BaseHandler):
@@ -372,7 +408,6 @@ class RoomCreationHandler(BaseHandler):
yield self.hs.get_handlers().room_member_handler.change_membership(
join_event,
- broadcast_msg=True,
do_auth=False
)
@@ -451,11 +486,11 @@ class RoomMemberHandler(BaseHandler):
member_list = yield self.store.get_room_members(room_id=room_id)
event_list = [
- entry.as_event(self.event_factory).get_dict()
+ entry.get_dict()
for entry in member_list
]
chunk_data = {
- "start": "START",
+ "start": "START", # FIXME (erikj): START is no longer a valid value
"end": "END",
"chunk": event_list
}
@@ -484,29 +519,28 @@ class RoomMemberHandler(BaseHandler):
defer.returnValue(member)
@defer.inlineCallbacks
- def change_membership(self, event=None, broadcast_msg=False, do_auth=True):
+ def change_membership(self, event=None, do_auth=True):
""" Change the membership status of a user in a room.
Args:
event (SynapseEvent): The membership event
- broadcast_msg (bool): True to inject a membership message into this
- room on success.
Raises:
SynapseError if there was a problem changing the membership.
"""
- #broadcast_msg = False
-
prev_state = yield self.store.get_room_member(
event.target_user_id, event.room_id
)
- if prev_state and prev_state.membership == event.membership:
- # treat this event as a NOOP.
- if do_auth: # This is mainly to fix a unit test.
- yield self.auth.check(event, raises=True)
- defer.returnValue({})
- return
+ if prev_state:
+ event.content["prev"] = prev_state.membership
+
+# if prev_state and prev_state.membership == event.membership:
+# # treat this event as a NOOP.
+# if do_auth: # This is mainly to fix a unit test.
+# yield self.auth.check(event, raises=True)
+# defer.returnValue({})
+# return
room_id = event.room_id
@@ -514,9 +548,7 @@ class RoomMemberHandler(BaseHandler):
# if this HS is not currently in the room, i.e. we have to do the
# invite/join dance.
if event.membership == Membership.JOIN:
- yield self._do_join(
- event, do_auth=do_auth, broadcast_msg=broadcast_msg
- )
+ yield self._do_join(event, do_auth=do_auth)
else:
# This is not a JOIN, so we can handle it normally.
if do_auth:
@@ -534,7 +566,6 @@ class RoomMemberHandler(BaseHandler):
yield self._do_local_membership_update(
event,
membership=event.content["membership"],
- broadcast_msg=broadcast_msg,
)
defer.returnValue({"room_id": room_id})
@@ -569,14 +600,14 @@ class RoomMemberHandler(BaseHandler):
defer.returnValue({"room_id": room_id})
@defer.inlineCallbacks
- def _do_join(self, event, room_host=None, do_auth=True, broadcast_msg=True):
+ def _do_join(self, event, room_host=None, do_auth=True):
joinee = self.hs.parse_userid(event.target_user_id)
# room_id = RoomID.from_string(event.room_id, self.hs)
room_id = event.room_id
# If event doesn't include a display name, add one.
- yield self._fill_out_join_content(
- joinee, event.content
+ yield self.distributor.fire(
+ "collect_presencelike_data", joinee, event.content
)
# XXX: We don't do an auth check if we are doing an invite
@@ -584,9 +615,9 @@ class RoomMemberHandler(BaseHandler):
# that we are allowed to join when we decide whether or not we
# need to do the invite/join dance.
- room = yield self.store.get_room(room_id)
+ hosts = yield self.store.get_joined_hosts_for_room(room_id)
- if room:
+ if self.hs.hostname in hosts:
should_do_dance = False
elif room_host:
should_do_dance = True
@@ -598,7 +629,7 @@ class RoomMemberHandler(BaseHandler):
if prev_state and prev_state.membership == Membership.INVITE:
room = yield self.store.get_room(room_id)
inviter = UserID.from_string(
- prev_state.sender, self.hs
+ prev_state.user_id, self.hs
)
should_do_dance = not inviter.is_mine and not room
@@ -606,8 +637,15 @@ class RoomMemberHandler(BaseHandler):
else:
should_do_dance = False
+ have_joined = False
+ if should_do_dance:
+ handler = self.hs.get_handlers().federation_handler
+ have_joined = yield handler.do_invite_join(
+ room_host, room_id, event.user_id, event.content
+ )
+
# We want to do the _do_update inside the room lock.
- if not should_do_dance:
+ if not have_joined:
logger.debug("Doing normal join")
if do_auth:
@@ -617,16 +655,6 @@ class RoomMemberHandler(BaseHandler):
yield self._do_local_membership_update(
event,
membership=event.content["membership"],
- broadcast_msg=broadcast_msg,
- )
-
-
- if should_do_dance:
- yield self._do_invite_join_dance(
- room_id=room_id,
- joinee=event.user_id,
- target_host=room_host,
- content=event.content,
)
user = self.hs.parse_userid(event.user_id)
@@ -635,32 +663,6 @@ class RoomMemberHandler(BaseHandler):
)
@defer.inlineCallbacks
- def _fill_out_join_content(self, user_id, content):
- # If event doesn't include a display name, add one.
- profile_handler = self.hs.get_handlers().profile_handler
- if "displayname" not in content:
- try:
- display_name = yield profile_handler.get_displayname(
- user_id
- )
-
- if display_name:
- content["displayname"] = display_name
- except:
- logger.exception("Failed to set display_name")
-
- if "avatar_url" not in content:
- try:
- avatar_url = yield profile_handler.get_avatar_url(
- user_id
- )
-
- if avatar_url:
- content["avatar_url"] = avatar_url
- except:
- logger.exception("Failed to set display_name")
-
- @defer.inlineCallbacks
def _should_invite_join(self, room_id, prev_state, do_auth):
logger.debug("_should_invite_join: room_id: %s", room_id)
@@ -694,18 +696,12 @@ class RoomMemberHandler(BaseHandler):
user_id=user.to_string(), membership_list=membership_list
)
- defer.returnValue([r["room_id"] for r in rooms])
+ defer.returnValue([r.room_id for r in rooms])
@defer.inlineCallbacks
- def _do_local_membership_update(self, event, membership, broadcast_msg):
+ def _do_local_membership_update(self, event, membership):
# store membership
- store_id = yield self.store.store_room_member(
- user_id=event.target_user_id,
- sender=event.user_id,
- room_id=event.room_id,
- content=event.content,
- membership=membership
- )
+ store_id = yield self.store.persist_event(event)
# Send a PDU to all hosts who have joined the room.
destinations = yield self.store.get_joined_hosts_for_room(
@@ -732,78 +728,11 @@ class RoomMemberHandler(BaseHandler):
yield self.hs.get_federation().handle_new_event(event)
self.notifier.on_new_room_event(event, store_id)
- if broadcast_msg:
- yield self._inject_membership_msg(
- source=event.user_id,
- target=event.target_user_id,
- room_id=event.room_id,
- membership=event.content["membership"]
- )
-
- @defer.inlineCallbacks
- def _do_invite_join_dance(self, room_id, joinee, target_host, content):
- logger.debug("Doing remote join dance")
-
- # do invite join dance
- federation = self.hs.get_federation()
- new_event = self.event_factory.create_event(
- etype=InviteJoinEvent.TYPE,
- target_host=target_host,
- room_id=room_id,
- user_id=joinee,
- content=content
- )
-
- new_event.destinations = [target_host]
-
- yield self.store.store_room(
- room_id, "", is_public=False
- )
-
- #yield self.state_handler.handle_new_event(event)
- yield federation.handle_new_event(new_event)
- yield federation.get_state_for_room(
- target_host, room_id
- )
-
- @defer.inlineCallbacks
- def _inject_membership_msg(self, room_id=None, source=None, target=None,
- membership=None):
- # TODO this should be a different type of message, not m.text
- if membership == Membership.INVITE:
- body = "%s invited %s to the room." % (source, target)
- elif membership == Membership.JOIN:
- body = "%s joined the room." % (target)
- elif membership == Membership.LEAVE:
- body = "%s left the room." % (target)
- else:
- raise RoomError(500, "Unknown membership value %s" % membership)
-
- membership_json = {
- "msgtype": u"m.text",
- "body": body,
- "membership_source": source,
- "membership_target": target,
- "membership": membership,
- }
-
- msg_id = "m%s" % int(self.clock.time_msec())
-
- event = self.event_factory.create_event(
- etype=MessageEvent.TYPE,
- room_id=room_id,
- user_id="_homeserver_",
- msg_id=msg_id,
- content=membership_json
- )
-
- handler = self.hs.get_handlers().message_handler
- yield handler.send_message(event, suppress_auth=True)
-
class RoomListHandler(BaseHandler):
@defer.inlineCallbacks
def get_public_room_list(self):
- chunk = yield self.store.get_rooms(is_public=True, with_topics=True)
+ chunk = yield self.store.get_rooms(is_public=True)
+ # FIXME (erikj): START is no longer a valid value
defer.returnValue({"start": "START", "end": "END", "chunk": chunk})
diff --git a/synapse/http/server.py b/synapse/http/server.py
index c28d9a33f9..d1f99460c1 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -212,8 +212,9 @@ class ContentRepoResource(resource.Resource):
"""
isLeaf = True
- def __init__(self, directory, auth):
+ def __init__(self, hs, directory, auth):
resource.Resource.__init__(self)
+ self.hs = hs
self.directory = directory
self.auth = auth
@@ -250,7 +251,8 @@ class ContentRepoResource(resource.Resource):
file_ext = re.sub("[^a-z]", "", file_ext)
suffix += "." + file_ext
- file_path = os.path.join(self.directory, prefix + main_part + suffix)
+ file_name = prefix + main_part + suffix
+ file_path = os.path.join(self.directory, file_name)
logger.info("User %s is uploading a file to path %s",
auth_user.to_string(),
file_path)
@@ -259,8 +261,8 @@ class ContentRepoResource(resource.Resource):
attempts = 0
while os.path.exists(file_path):
main_part = random_string(24)
- file_path = os.path.join(self.directory,
- prefix + main_part + suffix)
+ file_name = prefix + main_part + suffix
+ file_path = os.path.join(self.directory, file_name)
attempts += 1
if attempts > 25: # really? Really?
raise SynapseError(500, "Unable to create file.")
@@ -272,11 +274,14 @@ class ContentRepoResource(resource.Resource):
# servers.
# TODO: A little crude here, we could do this better.
- filename = request.path.split(self.directory + "/")[1]
+ filename = request.path.split('/')[-1]
# be paranoid
filename = re.sub("[^0-9A-z.-_]", "", filename)
file_path = self.directory + "/" + filename
+
+ logger.debug("Searching for %s", file_path)
+
if os.path.isfile(file_path):
# filename has the content type
base64_contentype = filename.split(".")[1]
@@ -304,6 +309,10 @@ class ContentRepoResource(resource.Resource):
self._async_render(request)
return server.NOT_DONE_YET
+ def render_OPTIONS(self, request):
+ respond_with_json_bytes(request, 200, {}, send_cors=True)
+ return server.NOT_DONE_YET
+
@defer.inlineCallbacks
def _async_render(self, request):
try:
@@ -313,8 +322,13 @@ class ContentRepoResource(resource.Resource):
with open(fname, "wb") as f:
f.write(request.content.read())
+
+ # FIXME (erikj): These should use constants.
+ file_name = os.path.basename(fname)
+ url = "http://%s/matrix/content/%s" % (self.hs.hostname, file_name)
+
respond_with_json_bytes(request, 200,
- json.dumps({"content_token": fname}),
+ json.dumps({"content_token": url}),
send_cors=True)
except CodeMessageException as e:
diff --git a/synapse/rest/room.py b/synapse/rest/room.py
index db8f18e8b3..f5b547b963 100644
--- a/synapse/rest/room.py
+++ b/synapse/rest/room.py
@@ -115,7 +115,7 @@ class RoomTopicRestServlet(RestServlet):
if not data:
raise SynapseError(404, "Topic not found.", errcode=Codes.NOT_FOUND)
- defer.returnValue((200, json.loads(data.content)))
+ defer.returnValue((200, data.content))
@defer.inlineCallbacks
def on_PUT(self, request, room_id):
@@ -177,7 +177,7 @@ class RoomMemberRestServlet(RestServlet):
if not member:
raise SynapseError(404, "Member not found.",
errcode=Codes.NOT_FOUND)
- defer.returnValue((200, json.loads(member.content)))
+ defer.returnValue((200, member.content))
@defer.inlineCallbacks
def on_DELETE(self, request, roomid, target_user_id):
@@ -193,7 +193,7 @@ class RoomMemberRestServlet(RestServlet):
)
handler = self.handlers.room_member_handler
- yield handler.change_membership(event, broadcast_msg=True)
+ yield handler.change_membership(event)
defer.returnValue((200, ""))
@defer.inlineCallbacks
@@ -220,7 +220,7 @@ class RoomMemberRestServlet(RestServlet):
)
handler = self.handlers.room_member_handler
- yield handler.change_membership(event, broadcast_msg=True)
+ yield handler.change_membership(event)
defer.returnValue((200, ""))
@@ -287,25 +287,28 @@ class FeedbackRestServlet(RestServlet):
feedback_type):
user = yield (self.auth.get_user_by_req(request))
- if feedback_type not in Feedback.LIST:
- raise SynapseError(400, "Bad feedback type.",
- errcode=Codes.BAD_JSON)
-
- msg_handler = self.handlers.message_handler
- feedback = yield msg_handler.get_feedback(
- room_id=urllib.unquote(room_id),
- msg_sender_id=msg_sender_id,
- msg_id=msg_id,
- user_id=user.to_string(),
- fb_sender_id=fb_sender_id,
- fb_type=feedback_type
- )
-
- if not feedback:
- raise SynapseError(404, "Feedback not found.",
- errcode=Codes.NOT_FOUND)
+ # TODO (erikj): Implement this?
+ raise NotImplementedError("Getting feedback is not supported")
- defer.returnValue((200, json.loads(feedback.content)))
+# if feedback_type not in Feedback.LIST:
+# raise SynapseError(400, "Bad feedback type.",
+# errcode=Codes.BAD_JSON)
+#
+# msg_handler = self.handlers.message_handler
+# feedback = yield msg_handler.get_feedback(
+# room_id=urllib.unquote(room_id),
+# msg_sender_id=msg_sender_id,
+# msg_id=msg_id,
+# user_id=user.to_string(),
+# fb_sender_id=fb_sender_id,
+# fb_type=feedback_type
+# )
+#
+# if not feedback:
+# raise SynapseError(404, "Feedback not found.",
+# errcode=Codes.NOT_FOUND)
+#
+# defer.returnValue((200, json.loads(feedback.content)))
@defer.inlineCallbacks
def on_PUT(self, request, room_id, sender_id, msg_id, fb_sender_id,
@@ -382,6 +385,21 @@ class RoomMessageListRestServlet(RestServlet):
defer.returnValue((200, msgs))
+class RoomTriggerBackfill(RestServlet):
+ PATTERN = client_path_pattern("/rooms/(?P<room_id>[^/]*)/backfill$")
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, room_id):
+ remote_server = urllib.unquote(request.args["remote"][0])
+ room_id = urllib.unquote(room_id)
+ limit = int(request.args["limit"][0])
+
+ handler = self.handlers.federation_handler
+ events = yield handler.backfill(remote_server, room_id, limit)
+
+ res = [event.get_dict() for event in events]
+ defer.returnValue((200, res))
+
def _parse_json(request):
try:
content = json.loads(request.content.read())
@@ -402,3 +420,4 @@ def register_servlets(hs, http_server):
RoomMemberListRestServlet(hs).register(http_server)
RoomMessageListRestServlet(hs).register(http_server)
JoinRoomAliasServlet(hs).register(http_server)
+ RoomTriggerBackfill(hs).register(http_server)
diff --git a/synapse/server.py b/synapse/server.py
index d4c2481483..c5b0a32757 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -159,7 +159,7 @@ class HomeServer(BaseHomeServer):
return DataStore(self)
def build_event_factory(self):
- return EventFactory()
+ return EventFactory(self)
def build_handlers(self):
return Handlers(self)
diff --git a/synapse/state.py b/synapse/state.py
index 4f8b4d9760..ca8e1ca630 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -86,7 +86,7 @@ class StateHandler(object):
else:
event.depth = 0
- current_state = yield self.store.get_current_state(
+ current_state = yield self.store.get_current_state_pdu(
key.context, key.type, key.state_key
)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 5d5b5f7c44..d06033b980 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -13,30 +13,35 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from twisted.internet import defer
from synapse.api.events.room import (
RoomMemberEvent, MessageEvent, RoomTopicEvent, FeedbackEvent,
- RoomConfigEvent
+ RoomConfigEvent, RoomNameEvent,
)
+from synapse.util.logutils import log_function
+
from .directory import DirectoryStore
from .feedback import FeedbackStore
-from .message import MessageStore
from .presence import PresenceStore
from .profile import ProfileStore
from .registration import RegistrationStore
from .room import RoomStore
from .roommember import RoomMemberStore
-from .roomdata import RoomDataStore
from .stream import StreamStore
from .pdu import StatePduStore, PduStore
from .transactions import TransactionStore
import json
+import logging
import os
-class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore,
+logger = logging.getLogger(__name__)
+
+
+class DataStore(RoomMemberStore, RoomStore,
RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
PresenceStore, PduStore, StatePduStore, TransactionStore,
DirectoryStore):
@@ -44,51 +49,147 @@ class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore,
def __init__(self, hs):
super(DataStore, self).__init__(hs)
self.event_factory = hs.get_event_factory()
+ self.hs = hs
- def persist_event(self, event):
- if event.type == MessageEvent.TYPE:
- return self.store_message(
- user_id=event.user_id,
- room_id=event.room_id,
- msg_id=event.msg_id,
- content=json.dumps(event.content)
- )
- elif event.type == RoomMemberEvent.TYPE:
- return self.store_room_member(
- user_id=event.target_user_id,
- sender=event.user_id,
- room_id=event.room_id,
- content=event.content,
- membership=event.content["membership"]
- )
+ self.min_token_deferred = self._get_min_token()
+ self.min_token = None
+
+ @defer.inlineCallbacks
+ @log_function
+ def persist_event(self, event, backfilled=False):
+ if event.type == RoomMemberEvent.TYPE:
+ yield self._store_room_member(event)
elif event.type == FeedbackEvent.TYPE:
- return self.store_feedback(
- room_id=event.room_id,
- msg_id=event.msg_id,
- msg_sender_id=event.msg_sender_id,
- fb_sender_id=event.user_id,
- fb_type=event.feedback_type,
- content=json.dumps(event.content)
- )
+ yield self._store_feedback(event)
+# elif event.type == RoomConfigEvent.TYPE:
+# yield self._store_room_config(event)
+ elif event.type == RoomNameEvent.TYPE:
+ yield self._store_room_name(event)
elif event.type == RoomTopicEvent.TYPE:
- return self.store_room_data(
- room_id=event.room_id,
- etype=event.type,
- state_key=event.state_key,
- content=json.dumps(event.content)
+ yield self._store_room_topic(event)
+
+ ret = yield self._store_event(event, backfilled)
+ defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def get_event(self, event_id):
+ events_dict = yield self._simple_select_one(
+ "events",
+ {"event_id": event_id},
+ [
+ "event_id",
+ "type",
+ "sender",
+ "room_id",
+ "content",
+ "unrecognized_keys"
+ ],
+ )
+
+ event = self._parse_event_from_row(events_dict)
+ defer.returnValue(event)
+
+ @defer.inlineCallbacks
+ @log_function
+ def _store_event(self, event, backfilled):
+ # FIXME (erikj): This should be removed when we start amalgamating
+ # event and pdu storage
+ yield self.hs.get_federation().fill_out_prev_events(event)
+
+ vals = {
+ "topological_ordering": event.depth,
+ "event_id": event.event_id,
+ "type": event.type,
+ "room_id": event.room_id,
+ "content": json.dumps(event.content),
+ "processed": True,
+ }
+
+ if hasattr(event, "outlier"):
+ vals["outlier"] = event.outlier
+ else:
+ vals["outlier"] = False
+
+ if backfilled:
+ if not self.min_token_deferred.called:
+ yield self.min_token_deferred
+ self.min_token -= 1
+ vals["stream_ordering"] = self.min_token
+
+ unrec = {
+ k: v
+ for k, v in event.get_full_dict().items()
+ if k not in vals.keys()
+ }
+ vals["unrecognized_keys"] = json.dumps(unrec)
+
+ try:
+ yield self._simple_insert("events", vals)
+ except:
+ logger.exception(
+ "Failed to persist, probably duplicate: %s",
+ event.event_id
+ )
+ return
+
+ if not backfilled and hasattr(event, "state_key"):
+ vals = {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ }
+
+ if hasattr(event, "prev_state"):
+ vals["prev_state"] = event.prev_state
+
+ yield self._simple_insert("state_events", vals)
+
+ yield self._simple_insert(
+ "current_state_events",
+ {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ }
)
- elif event.type == RoomConfigEvent.TYPE:
- if "visibility" in event.content:
- visibility = event.content["visibility"]
- return self.store_room_config(
- room_id=event.room_id,
- visibility=visibility
- )
+ latest = yield self.get_room_events_max_id()
+ defer.returnValue(latest)
+
+ @defer.inlineCallbacks
+ def get_current_state(self, room_id, event_type=None, state_key=""):
+ sql = (
+ "SELECT e.* FROM events as e "
+ "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
+ "INNER JOIN state_events as s ON e.event_id = s.event_id "
+ "WHERE c.room_id = ? "
+ )
+
+ if event_type:
+ sql += " AND s.type = ? AND s.state_key = ? "
+ args = (room_id, event_type, state_key)
else:
- raise NotImplementedError(
- "Don't know how to persist type=%s" % event.type
- )
+ args = (room_id, )
+
+ results = yield self._execute_and_decode(sql, *args)
+
+ defer.returnValue([self._parse_event_from_row(r) for r in results])
+
+ @defer.inlineCallbacks
+ def _get_min_token(self):
+ row = yield self._execute(
+ None,
+ "SELECT MIN(stream_ordering) FROM events"
+ )
+
+ self.min_token = row[0][0] if row and row[0] and row[0][0] else -1
+ self.min_token = min(self.min_token, -1)
+
+ logger.debug("min_token is: %s", self.min_token)
+
+ defer.returnValue(self.min_token)
def schema_path(schema):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index bf1800f4bf..75aab2d3b9 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import logging
from twisted.internet import defer
@@ -20,6 +19,9 @@ from twisted.internet import defer
from synapse.api.errors import StoreError
import collections
+import copy
+import json
+
logger = logging.getLogger(__name__)
@@ -29,6 +31,7 @@ class SQLBaseStore(object):
def __init__(self, hs):
self.hs = hs
self._db_pool = hs.get_db_pool()
+ self.event_factory = hs.get_event_factory()
self._clock = hs.get_clock()
def cursor_to_dict(self, cursor):
@@ -57,14 +60,22 @@ class SQLBaseStore(object):
The result of decoder(results)
"""
logger.debug(
- "[SQL] %s Args=%s Func=%s", query, args, decoder.__name__
+ "[SQL] %s Args=%s Func=%s",
+ query, args, decoder.__name__ if decoder else None
)
def interaction(txn):
cursor = txn.execute(query, args)
- return decoder(cursor)
+ if decoder:
+ return decoder(cursor)
+ else:
+ return cursor.fetchall()
+
return self._db_pool.runInteraction(interaction)
+ def _execute_and_decode(self, query, *args):
+ return self._execute(self.cursor_to_dict, query, *args)
+
# "Simple" SQL API methods that operate on a single table with no JOINs,
# no complex WHERE clauses, just a dict of values for columns.
@@ -281,6 +292,22 @@ class SQLBaseStore(object):
return self._db_pool.runInteraction(func)
+ def _parse_event_from_row(self, row_dict):
+ d = copy.deepcopy({k: v for k, v in row_dict.items() if v})
+
+ d.pop("stream_ordering", None)
+ d.pop("topological_ordering", None)
+ d.pop("processed", None)
+
+ d.update(json.loads(row_dict["unrecognized_keys"]))
+ d["content"] = json.loads(d["content"])
+ del d["unrecognized_keys"]
+
+ return self.event_factory.create_event(
+ etype=d["type"],
+ **d
+ )
+
class Table(object):
""" A base class used to store information about a particular table.
diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py
index 9bd562c762..e60f98d1e1 100644
--- a/synapse/storage/feedback.py
+++ b/synapse/storage/feedback.py
@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from twisted.internet import defer
+
from ._base import SQLBaseStore, Table
from synapse.api.events.room import FeedbackEvent
@@ -22,54 +24,28 @@ import json
class FeedbackStore(SQLBaseStore):
- def store_feedback(self, room_id, msg_id, msg_sender_id,
- fb_sender_id, fb_type, content):
- return self._simple_insert(FeedbackTable.table_name, dict(
- room_id=room_id,
- msg_id=msg_id,
- msg_sender_id=msg_sender_id,
- fb_sender_id=fb_sender_id,
- fb_type=fb_type,
- content=content,
- ))
-
- def get_feedback(self, room_id=None, msg_id=None, msg_sender_id=None,
- fb_sender_id=None, fb_type=None):
- query = FeedbackTable.select_statement(
- "msg_sender_id = ? AND room_id = ? AND msg_id = ? " +
- "AND fb_sender_id = ? AND feedback_type = ? " +
- "ORDER BY id DESC LIMIT 1")
- return self._execute(
- FeedbackTable.decode_single_result,
- query, msg_sender_id, room_id, msg_id, fb_sender_id, fb_type,
+ def _store_feedback(self, event):
+ return self._simple_insert("feedback", {
+ "event_id": event.event_id,
+ "feedback_type": event.feedback_type,
+ "room_id": event.room_id,
+ "target_event_id": event.target_event,
+ "sender": event.user_id,
+ })
+
+ @defer.inlineCallbacks
+ def get_feedback_for_event(self, event_id):
+ sql = (
+ "SELECT events.* FROM events INNER JOIN feedback "
+ "ON events.event_id = feedback.event_id "
+ "WHERE feedback.target_event_id = ? "
)
- def get_max_feedback_id(self):
- return self._simple_max_id(FeedbackTable.table_name)
-
-
-class FeedbackTable(Table):
- table_name = "feedback"
+ rows = yield self._execute_and_decode(sql, event_id)
- fields = [
- "id",
- "content",
- "feedback_type",
- "fb_sender_id",
- "msg_id",
- "room_id",
- "msg_sender_id"
- ]
-
- class EntryType(collections.namedtuple("FeedbackEntry", fields)):
-
- def as_event(self, event_factory):
- return event_factory.create_event(
- etype=FeedbackEvent.TYPE,
- room_id=self.room_id,
- msg_id=self.msg_id,
- msg_sender_id=self.msg_sender_id,
- user_id=self.fb_sender_id,
- feedback_type=self.feedback_type,
- content=json.loads(self.content),
- )
+ defer.returnValue(
+ [
+ self._parse_event_from_row(r)
+ for r in rows
+ ]
+ )
diff --git a/synapse/storage/message.py b/synapse/storage/message.py
deleted file mode 100644
index 7bb69c1384..0000000000
--- a/synapse/storage/message.py
+++ /dev/null
@@ -1,81 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014 matrix.org
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ._base import SQLBaseStore, Table
-from synapse.api.events.room import MessageEvent
-
-import collections
-import json
-
-
-class MessageStore(SQLBaseStore):
-
- def get_message(self, user_id, room_id, msg_id):
- """Get a message from the store.
-
- Args:
- user_id (str): The ID of the user who sent the message.
- room_id (str): The room the message was sent in.
- msg_id (str): The unique ID for this user/room combo.
- """
- query = MessagesTable.select_statement(
- "user_id = ? AND room_id = ? AND msg_id = ? " +
- "ORDER BY id DESC LIMIT 1")
- return self._execute(
- MessagesTable.decode_single_result,
- query, user_id, room_id, msg_id,
- )
-
- def store_message(self, user_id, room_id, msg_id, content):
- """Store a message in the store.
-
- Args:
- user_id (str): The ID of the user who sent the message.
- room_id (str): The room the message was sent in.
- msg_id (str): The unique ID for this user/room combo.
- content (str): The content of the message (JSON)
- """
- return self._simple_insert(MessagesTable.table_name, dict(
- user_id=user_id,
- room_id=room_id,
- msg_id=msg_id,
- content=content,
- ))
-
- def get_max_message_id(self):
- return self._simple_max_id(MessagesTable.table_name)
-
-
-class MessagesTable(Table):
- table_name = "messages"
-
- fields = [
- "id",
- "user_id",
- "room_id",
- "msg_id",
- "content"
- ]
-
- class EntryType(collections.namedtuple("MessageEntry", fields)):
-
- def as_event(self, event_factory):
- return event_factory.create_event(
- etype=MessageEvent.TYPE,
- room_id=self.room_id,
- user_id=self.user_id,
- msg_id=self.msg_id,
- content=json.loads(self.content),
- )
diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py
index 13adc581e1..7655f43ede 100644
--- a/synapse/storage/pdu.py
+++ b/synapse/storage/pdu.py
@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from twisted.internet import defer
+
from ._base import SQLBaseStore, Table, JoinHelper
from synapse.util.logutils import log_function
@@ -319,6 +321,7 @@ class PduStore(SQLBaseStore):
return [(row[0], row[1], row[2]) for row in results]
+ @defer.inlineCallbacks
def get_oldest_pdus_in_context(self, context):
"""Get a list of Pdus that we haven't backfilled beyond yet (and haven't
seen). This list is used when we want to backfill backwards and is the
@@ -331,17 +334,14 @@ class PduStore(SQLBaseStore):
Returns:
list: A list of PduIdTuple.
"""
- return self._db_pool.runInteraction(
- self._get_oldest_pdus_in_context, context
- )
-
- def _get_oldest_pdus_in_context(self, txn, context):
- txn.execute(
+ results = yield self._execute(
+ None,
"SELECT pdu_id, origin FROM %(back)s WHERE context = ?"
% {"back": PduBackwardExtremitiesTable.table_name, },
- (context,)
+ context
)
- return [PduIdTuple(i, o) for i, o in txn.fetchall()]
+
+ defer.returnValue([PduIdTuple(i, o) for i, o in results])
def is_pdu_new(self, pdu_id, origin, context, depth):
"""For a given Pdu, try and figure out if it's 'new', i.e., if it's
@@ -580,7 +580,7 @@ class StatePduStore(SQLBaseStore):
txn.execute(query, query_args)
- def get_current_state(self, context, pdu_type, state_key):
+ def get_current_state_pdu(self, context, pdu_type, state_key):
"""For a given context, pdu_type, state_key 3-tuple, return what is
currently considered the current state.
@@ -595,10 +595,10 @@ class StatePduStore(SQLBaseStore):
"""
return self._db_pool.runInteraction(
- self._get_current_state, context, pdu_type, state_key
+ self._get_current_state_pdu, context, pdu_type, state_key
)
- def _get_current_state(self, txn, context, pdu_type, state_key):
+ def _get_current_state_pdu(self, txn, context, pdu_type, state_key):
return self._get_current_interaction(txn, context, pdu_type, state_key)
def _get_current_interaction(self, txn, context, pdu_type, state_key):
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index a97162831b..22f2dcca45 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -76,49 +76,80 @@ class RoomStore(SQLBaseStore):
)
@defer.inlineCallbacks
- def get_rooms(self, is_public, with_topics):
+ def get_rooms(self, is_public):
"""Retrieve a list of all public rooms.
Args:
is_public (bool): True if the rooms returned should be public.
- with_topics (bool): True to include the current topic for the room
- in the response.
Returns:
- A list of room dicts containing at least a "room_id" key, and a
- "topic" key if one is set and with_topic=True.
+ A list of room dicts containing at least a "room_id" key, a
+ "topic" key if one is set, and a "name" key if one is set
"""
- room_data_type = RoomTopicEvent.TYPE
- public = 1 if is_public else 0
-
- latest_topic = ("SELECT max(room_data.id) FROM room_data WHERE "
- + "room_data.type = ? GROUP BY room_id")
-
- query = ("SELECT rooms.*, room_data.content, room_alias FROM rooms "
- + "LEFT JOIN "
- + "room_aliases ON room_aliases.room_id = rooms.room_id "
- + "LEFT JOIN "
- + "room_data ON rooms.room_id = room_data.room_id WHERE "
- + "(room_data.id IN (" + latest_topic + ") "
- + "OR room_data.id IS NULL) AND rooms.is_public = ?")
-
- res = yield self._execute(
- self.cursor_to_dict, query, room_data_type, public
+
+ topic_subquery = (
+ "SELECT topics.event_id as event_id, "
+ "topics.room_id as room_id, topic "
+ "FROM topics "
+ "INNER JOIN current_state_events as c "
+ "ON c.event_id = topics.event_id "
)
- # return only the keys the specification expects
- ret_keys = ["room_id", "topic", "room_alias"]
+ name_subquery = (
+ "SELECT room_names.event_id as event_id, "
+ "room_names.room_id as room_id, name "
+ "FROM room_names "
+ "INNER JOIN current_state_events as c "
+ "ON c.event_id = room_names.event_id "
+ )
- # extract topic from the json (icky) FIXME
- for i, room_row in enumerate(res):
- try:
- content_json = json.loads(room_row["content"])
- room_row["topic"] = content_json["topic"]
- except:
- pass # no topic set
- # filter the dict based on ret_keys
- res[i] = {k: v for k, v in room_row.iteritems() if k in ret_keys}
+ # We use non printing ascii character US () as a seperator
+ sql = (
+ "SELECT r.room_id, n.name, t.topic, "
+ "group_concat(a.room_alias, '') "
+ "FROM rooms AS r "
+ "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id "
+ "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id "
+ "INNER JOIN room_aliases AS a ON a.room_id = r.room_id "
+ "WHERE r.is_public = ? "
+ "GROUP BY r.room_id "
+ ) % {
+ "topic": topic_subquery,
+ "name": name_subquery,
+ }
+
+ rows = yield self._execute(None, sql, is_public)
+
+ ret = [
+ {
+ "room_id": r[0],
+ "name": r[1],
+ "topic": r[2],
+ "aliases": r[3].split(""),
+ }
+ for r in rows
+ ]
+
+ defer.returnValue(ret)
+
+ def _store_room_topic(self, event):
+ return self._simple_insert(
+ "topics",
+ {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "topic": event.topic,
+ }
+ )
- defer.returnValue(res)
+ def _store_room_name(self, event):
+ return self._simple_insert(
+ "room_names",
+ {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "name": event.name,
+ }
+ )
class RoomsTable(Table):
diff --git a/synapse/storage/roomdata.py b/synapse/storage/roomdata.py
deleted file mode 100644
index cc04d1ba14..0000000000
--- a/synapse/storage/roomdata.py
+++ /dev/null
@@ -1,85 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014 matrix.org
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ._base import SQLBaseStore, Table
-
-import collections
-import json
-
-
-class RoomDataStore(SQLBaseStore):
-
- """Provides various CRUD operations for Room Events. """
-
- def get_room_data(self, room_id, etype, state_key=""):
- """Retrieve the data stored under this type and state_key.
-
- Args:
- room_id (str)
- etype (str)
- state_key (str)
- Returns:
- namedtuple: Or None if nothing exists at this path.
- """
- query = RoomDataTable.select_statement(
- "room_id = ? AND type = ? AND state_key = ? "
- "ORDER BY id DESC LIMIT 1"
- )
- return self._execute(
- RoomDataTable.decode_single_result,
- query, room_id, etype, state_key,
- )
-
- def store_room_data(self, room_id, etype, state_key="", content=None):
- """Stores room specific data.
-
- Args:
- room_id (str)
- etype (str)
- state_key (str)
- data (str)- The data to store for this path in JSON.
- Returns:
- The store ID for this data.
- """
- return self._simple_insert(RoomDataTable.table_name, dict(
- etype=etype,
- state_key=state_key,
- room_id=room_id,
- content=content,
- ))
-
- def get_max_room_data_id(self):
- return self._simple_max_id(RoomDataTable.table_name)
-
-
-class RoomDataTable(Table):
- table_name = "room_data"
-
- fields = [
- "id",
- "room_id",
- "type",
- "state_key",
- "content"
- ]
-
- class EntryType(collections.namedtuple("RoomDataEntry", fields)):
-
- def as_event(self, event_factory):
- return event_factory.create_event(
- etype=self.type,
- room_id=self.room_id,
- content=json.loads(self.content),
- )
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index c45d128f1b..89c87290cf 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -31,6 +31,38 @@ logger = logging.getLogger(__name__)
class RoomMemberStore(SQLBaseStore):
+ @defer.inlineCallbacks
+ def _store_room_member(self, event):
+ """Store a room member in the database.
+ """
+ domain = self.hs.parse_userid(event.target_user_id).domain
+
+ yield self._simple_insert(
+ "room_memberships",
+ {
+ "event_id": event.event_id,
+ "user_id": event.target_user_id,
+ "sender": event.user_id,
+ "room_id": event.room_id,
+ "membership": event.membership,
+ }
+ )
+
+ # Update room hosts table
+ if event.membership == Membership.JOIN:
+ sql = (
+ "INSERT OR IGNORE INTO room_hosts (room_id, host) "
+ "VALUES (?, ?)"
+ )
+ yield self._execute(None, sql, event.room_id, domain)
+ else:
+ sql = (
+ "DELETE FROM room_hosts WHERE room_id = ? AND host = ?"
+ )
+
+ yield self._execute(None, sql, event.room_id, domain)
+
+ @defer.inlineCallbacks
def get_room_member(self, user_id, room_id):
"""Retrieve the current state of a room member.
@@ -38,36 +70,15 @@ class RoomMemberStore(SQLBaseStore):
user_id (str): The member's user ID.
room_id (str): The room the member is in.
Returns:
- namedtuple: The room member from the database, or None if this
- member does not exist.
+ Deferred: Results in a MembershipEvent or None.
"""
- query = RoomMemberTable.select_statement(
- "room_id = ? AND user_id = ? ORDER BY id DESC LIMIT 1")
- return self._execute(
- RoomMemberTable.decode_single_result,
- query, room_id, user_id,
- )
+ rows = yield self._get_members_by_dict({
+ "e.room_id": room_id,
+ "m.user_id": user_id,
+ })
- def store_room_member(self, user_id, sender, room_id, membership, content):
- """Store a room member in the database.
+ defer.returnValue(rows[0] if rows else None)
- Args:
- user_id (str): The member's user ID.
- room_id (str): The room in relation to the member.
- membership (synapse.api.constants.Membership): The new membership
- state.
- content (dict): The content of the membership (JSON).
- """
- content_json = json.dumps(content)
- return self._simple_insert(RoomMemberTable.table_name, dict(
- user_id=user_id,
- sender=sender,
- room_id=room_id,
- membership=membership,
- content=content_json,
- ))
-
- @defer.inlineCallbacks
def get_room_members(self, room_id, membership=None):
"""Retrieve the current room member list for a room.
@@ -79,17 +90,12 @@ class RoomMemberStore(SQLBaseStore):
Returns:
list of namedtuples representing the members in this room.
"""
- query = RoomMemberTable.select_statement(
- "id IN (SELECT MAX(id) FROM " + RoomMemberTable.table_name
- + " WHERE room_id = ? GROUP BY user_id)"
- )
- res = yield self._execute(
- RoomMemberTable.decode_results, query, room_id,
- )
- # strip memberships which don't match
+
+ where = {"m.room_id": room_id}
if membership:
- res = [entry for entry in res if entry.membership == membership]
- defer.returnValue(res)
+ where["m.membership"] = membership
+
+ return self._get_members_by_dict(where)
def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
""" Get all the rooms for this user where the membership for this user
@@ -106,70 +112,40 @@ class RoomMemberStore(SQLBaseStore):
return defer.succeed(None)
args = [user_id]
- membership_placeholder = ["membership=?"] * len(membership_list)
- where_membership = "(" + " OR ".join(membership_placeholder) + ")"
- for membership in membership_list:
- args.append(membership)
-
- # sub-select finds the row ID for the most recent (i.e. current)
- # state change of this user per room, then the outer select finds those
- query = ("SELECT room_id, membership FROM room_memberships"
- + " WHERE id IN (SELECT MAX(id) FROM room_memberships"
- + " WHERE user_id=? GROUP BY room_id)"
- + " AND " + where_membership)
- return self._execute(
- self.cursor_to_dict, query, *args
- )
+ args.extend(membership_list)
- @defer.inlineCallbacks
- def get_joined_hosts_for_room(self, room_id):
- query = RoomMemberTable.select_statement(
- "id IN (SELECT MAX(id) FROM " + RoomMemberTable.table_name
- + " WHERE room_id = ? GROUP BY user_id)"
- )
-
- res = yield self._execute(
- RoomMemberTable.decode_results, query, room_id,
+ where_clause = "user_id = ? AND (%s)" % (
+ " OR ".join(["membership = ?" for _ in membership_list]),
)
- def host_from_user_id_string(user_id):
- domain = UserID.from_string(entry.user_id, self.hs).domain
- return domain
-
- # strip memberships which don't match
- hosts = [
- host_from_user_id_string(entry.user_id)
- for entry in res
- if entry.membership == Membership.JOIN
- ]
+ return self._get_members_query(where_clause, args)
- logger.debug("Returning hosts: %s from results: %s", hosts, res)
-
- defer.returnValue(hosts)
-
- def get_max_room_member_id(self):
- return self._simple_max_id(RoomMemberTable.table_name)
-
-
-class RoomMemberTable(Table):
- table_name = "room_memberships"
-
- fields = [
- "id",
- "user_id",
- "sender",
- "room_id",
- "membership",
- "content"
- ]
+ def get_joined_hosts_for_room(self, room_id):
+ return self._simple_select_onecol(
+ "room_hosts",
+ {"room_id": room_id},
+ "host"
+ )
- class EntryType(collections.namedtuple("RoomMemberEntry", fields)):
+ def _get_members_by_dict(self, where_dict):
+ clause = " AND ".join("%s = ?" % k for k in where_dict.keys())
+ vals = where_dict.values()
+ return self._get_members_query(clause, vals)
- def as_event(self, event_factory):
- return event_factory.create_event(
- etype=RoomMemberEvent.TYPE,
- room_id=self.room_id,
- target_user_id=self.user_id,
- user_id=self.sender,
- content=json.loads(self.content),
- )
+ @defer.inlineCallbacks
+ def _get_members_query(self, where_clause, where_values):
+ sql = (
+ "SELECT e.* FROM events as e "
+ "INNER JOIN room_memberships as m "
+ "ON e.event_id = m.event_id "
+ "INNER JOIN current_state_events as c "
+ "ON m.event_id = c.event_id "
+ "WHERE %s "
+ ) % (where_clause,)
+
+ rows = yield self._execute_and_decode(sql, *where_values)
+
+ logger.debug("_get_members_query Got rows %s", rows)
+
+ results = [self._parse_event_from_row(r) for r in rows]
+ defer.returnValue(results)
diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql
index 77096546b2..39a1ed703e 100644
--- a/synapse/storage/schema/im.sql
+++ b/synapse/storage/schema/im.sql
@@ -12,43 +12,71 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-CREATE TABLE IF NOT EXISTS rooms(
- room_id TEXT PRIMARY KEY NOT NULL,
- is_public INTEGER,
- creator TEXT
+
+CREATE TABLE IF NOT EXISTS events(
+ stream_ordering INTEGER PRIMARY KEY AUTOINCREMENT,
+ topological_ordering INTEGER NOT NULL,
+ event_id TEXT NOT NULL,
+ type TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ content TEXT NOT NULL,
+ unrecognized_keys TEXT,
+ processed BOOL NOT NULL,
+ outlier BOOL NOT NULL,
+ CONSTRAINT ev_uniq UNIQUE (event_id)
);
-CREATE TABLE IF NOT EXISTS room_memberships(
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- user_id TEXT NOT NULL, -- no foreign key to users table, it could be an id belonging to another home server
- sender TEXT NOT NULL,
+CREATE TABLE IF NOT EXISTS state_events(
+ event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
- membership TEXT NOT NULL,
- content TEXT NOT NULL
+ type TEXT NOT NULL,
+ state_key TEXT NOT NULL,
+ prev_state TEXT
);
-CREATE TABLE IF NOT EXISTS messages(
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- user_id TEXT,
- room_id TEXT,
- msg_id TEXT,
- content TEXT
+CREATE TABLE IF NOT EXISTS current_state_events(
+ event_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ type TEXT NOT NULL,
+ state_key TEXT NOT NULL,
+ CONSTRAINT curr_uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE
+);
+
+CREATE TABLE IF NOT EXISTS room_memberships(
+ event_id TEXT NOT NULL,
+ user_id TEXT NOT NULL,
+ sender TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ membership TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS feedback(
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- content TEXT,
+ event_id TEXT NOT NULL,
feedback_type TEXT,
- fb_sender_id TEXT,
- msg_id TEXT,
- room_id TEXT,
- msg_sender_id TEXT
+ target_event_id TEXT,
+ sender TEXT,
+ room_id TEXT
);
-CREATE TABLE IF NOT EXISTS room_data(
- id INTEGER PRIMARY KEY AUTOINCREMENT,
+CREATE TABLE IF NOT EXISTS topics(
+ event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
- type TEXT NOT NULL,
- state_key TEXT NOT NULL,
- content TEXT
+ topic TEXT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS room_names(
+ event_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ name TEXT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS rooms(
+ room_id TEXT PRIMARY KEY NOT NULL,
+ is_public INTEGER,
+ creator TEXT
+);
+
+CREATE TABLE IF NOT EXISTS room_hosts(
+ room_id TEXT NOT NULL,
+ host TEXT NOT NULL
);
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 47a1f2c45a..87ae961ccd 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -13,267 +13,287 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+""" This module is responsible for getting events from the DB for pagination
+and event streaming.
+
+The order it returns events in depend on whether we are streaming forwards or
+are paginating backwards. We do this because we want to handle out of order
+messages nicely, while still returning them in the correct order when we
+paginate bacwards.
+
+This is implemented by keeping two ordering columns: stream_ordering and
+topological_ordering. Stream ordering is basically insertion/received order
+(except for events from backfill requests). The topolgical_ordering is a
+weak ordering of events based on the pdu graph.
+
+This means that we have to have two different types of tokens, depending on
+what sort order was used:
+ - stream tokens are of the form: "s%d", which maps directly to the column
+ - topological tokems: "t%d-%d", where the integers map to the topological
+ and stream ordering columns respectively.
+"""
+
+from twisted.internet import defer
from ._base import SQLBaseStore
-from .message import MessagesTable
-from .feedback import FeedbackTable
-from .roomdata import RoomDataTable
-from .roommember import RoomMemberTable
+from synapse.api.errors import SynapseError
+from synapse.api.constants import Membership
+from synapse.util.logutils import log_function
import json
import logging
+
logger = logging.getLogger(__name__)
+MAX_STREAM_SIZE = 1000
+
+
+_STREAM_TOKEN = "stream"
+_TOPOLOGICAL_TOKEN = "topological"
+
+
+def _parse_stream_token(string):
+ try:
+ if string[0] != 's':
+ raise
+ return int(string[1:])
+ except:
+ raise SynapseError(400, "Invalid token")
+
+
+def _parse_topological_token(string):
+ try:
+ if string[0] != 't':
+ raise
+ parts = string[1:].split('-', 1)
+ return (int(parts[0]), int(parts[1]))
+ except:
+ raise SynapseError(400, "Invalid token")
+
+
+def is_stream_token(string):
+ try:
+ _parse_stream_token(string)
+ return True
+ except:
+ return False
+
+
+def is_topological_token(string):
+ try:
+ _parse_topological_token(string)
+ return True
+ except:
+ return False
+
+
+def _get_token_bound(token, comparison):
+ try:
+ s = _parse_stream_token(token)
+ return "%s %s %d" % ("stream_ordering", comparison, s)
+ except:
+ pass
+
+ try:
+ top, stream = _parse_topological_token(token)
+ return "%s %s %d AND %s %s %d" % (
+ "topological_ordering", comparison, top,
+ "stream_ordering", comparison, stream,
+ )
+ except:
+ pass
+
+ raise SynapseError(400, "Invalid token")
+
+
class StreamStore(SQLBaseStore):
+ @log_function
+ def get_room_events(self, user_id, from_key, to_key, room_id, limit=0,
+ direction='f', with_feedback=False):
+ # We deal with events request in two different ways depending on if
+ # this looks like an /events request or a pagination request.
+ is_events = (
+ direction == 'f'
+ and user_id
+ and is_stream_token(from_key)
+ and to_key and is_stream_token(to_key)
+ )
- def get_message_stream(self, user_id, from_key, to_key, room_id, limit=0,
- with_feedback=False):
- """Get all messages for this user between the given keys.
-
- Args:
- user_id (str): The user who is requesting messages.
- from_key (int): The ID to start returning results from (exclusive).
- to_key (int): The ID to stop returning results (exclusive).
- room_id (str): Gets messages only for this room. Can be None, in
- which case all room messages will be returned.
- Returns:
- A tuple of rows (list of namedtuples), new_id(int)
- """
- if with_feedback and room_id: # with fb MUST specify a room ID
- return self._db_pool.runInteraction(
- self._get_message_rows_with_feedback,
- user_id, from_key, to_key, room_id, limit
+ if is_events:
+ return self.get_room_events_stream(
+ user_id=user_id,
+ from_key=from_key,
+ to_key=to_key,
+ room_id=room_id,
+ limit=limit,
+ with_feedback=with_feedback,
)
else:
- return self._db_pool.runInteraction(
- self._get_message_rows,
- user_id, from_key, to_key, room_id, limit
+ return self.paginate_room_events(
+ from_key=from_key,
+ to_key=to_key,
+ room_id=room_id,
+ limit=limit,
+ with_feedback=with_feedback,
)
- def _get_message_rows(self, txn, user_id, from_pkey, to_pkey, room_id,
- limit):
- # work out which rooms this user is joined in on and join them with
- # the room id on the messages table, bounded by the specified pkeys
+ @defer.inlineCallbacks
+ @log_function
+ def get_room_events_stream(self, user_id, from_key, to_key, room_id,
+ limit=0, with_feedback=False):
+ # TODO (erikj): Handle compressed feedback
- # get all messages where the *current* membership state is 'join' for
- # this user in that room.
- query = ("SELECT messages.* FROM messages WHERE ? IN"
- + " (SELECT membership from room_memberships WHERE user_id=?"
- + " AND room_id = messages.room_id ORDER BY id DESC LIMIT 1)")
- query_args = ["join", user_id]
+ current_room_membership_sql = (
+ "SELECT m.room_id FROM room_memberships as m "
+ "INNER JOIN current_state_events as c ON m.event_id = c.event_id "
+ "WHERE m.user_id = ?"
+ )
- if room_id:
- query += " AND messages.room_id=?"
- query_args.append(room_id)
+ # We also want to get any membership events about that user, e.g.
+ # invites or leave notifications.
+ membership_sql = (
+ "SELECT m.event_id FROM room_memberships as m "
+ "INNER JOIN current_state_events as c ON m.event_id = c.event_id "
+ "WHERE m.user_id = ? "
+ )
- (query, query_args) = self._append_stream_operations(
- "messages", query, query_args, from_pkey, to_pkey, limit=limit
+ if limit:
+ limit = max(limit, MAX_STREAM_SIZE)
+ else:
+ limit = MAX_STREAM_SIZE
+
+ # From and to keys should be integers from ordering.
+ from_id = _parse_stream_token(from_key)
+ to_id = _parse_stream_token(to_key)
+
+ if from_key == to_key:
+ defer.returnValue(([], to_key))
+ return
+
+ sql = (
+ "SELECT * FROM events as e WHERE "
+ "((room_id IN (%(current)s)) OR "
+ "(event_id IN (%(invites)s))) "
+ "AND e.stream_ordering > ? AND e.stream_ordering < ? "
+ "AND e.outlier = 0 "
+ "ORDER BY stream_ordering ASC LIMIT %(limit)d "
+ ) % {
+ "current": current_room_membership_sql,
+ "invites": membership_sql,
+ "limit": limit
+ }
+
+ rows = yield self._execute_and_decode(
+ sql,
+ user_id, user_id, from_id, to_id
)
- cursor = txn.execute(query, query_args)
- return self._as_events(cursor, MessagesTable, from_pkey)
+ ret = [self._parse_event_from_row(r) for r in rows]
- def _get_message_rows_with_feedback(self, txn, user_id, from_pkey, to_pkey,
- room_id, limit):
- # this col represents the compressed feedback JSON as per spec
- compressed_feedback_col = (
- "'[' || group_concat('{\"sender_id\":\"' || f.fb_sender_id"
- + " || '\",\"feedback_type\":\"' || f.feedback_type"
- + " || '\",\"content\":' || f.content || '}') || ']'"
- )
+ if rows:
+ key = "s%d" % max([r["stream_ordering"] for r in rows])
+ else:
+ # Assume we didn't get anything because there was nothing to get.
+ key = to_key
+
+ defer.returnValue((ret, key))
- global_msg_id_join = ("f.room_id = messages.room_id"
- + " and f.msg_id = messages.msg_id"
- + " and messages.user_id = f.msg_sender_id")
+ @defer.inlineCallbacks
+ @log_function
+ def paginate_room_events(self, room_id, from_key, to_key=None,
+ direction='b', limit=-1,
+ with_feedback=False):
+ # TODO (erikj): Handle compressed feedback
- select_query = (
- "SELECT messages.*, f.content AS fb_content, f.fb_sender_id"
- + ", " + compressed_feedback_col + " AS compressed_fb"
- + " FROM messages LEFT JOIN feedback f ON " + global_msg_id_join)
+ from_comp = '<' if direction =='b' else '>'
+ to_comp = '>' if direction =='b' else '<'
+ order = "DESC" if direction == 'b' else "ASC"
- current_membership_sub_query = (
- "(SELECT membership from room_memberships rm"
- + " WHERE user_id=? AND room_id = rm.room_id"
- + " ORDER BY id DESC LIMIT 1)")
+ args = [room_id]
- where = (" WHERE ? IN " + current_membership_sub_query
- + " AND messages.room_id=?")
+ bounds = _get_token_bound(from_key, from_comp)
+ if to_key:
+ bounds = "%s AND %s" % (bounds, _get_token_bound(to_key, to_comp))
+
+ if int(limit) > 0:
+ args.append(int(limit))
+ limit_str = " LIMIT ?"
+ else:
+ limit_str = ""
- query = select_query + where
- query_args = ["join", user_id, room_id]
+ sql = (
+ "SELECT * FROM events "
+ "WHERE outlier = 0 AND room_id = ? AND %(bounds)s "
+ "ORDER BY topological_ordering %(order)s, stream_ordering %(order)s %(limit)s "
+ ) % {"bounds": bounds, "order": order, "limit": limit_str}
- (query, query_args) = self._append_stream_operations(
- "messages", query, query_args, from_pkey, to_pkey,
- limit=limit, group_by=" GROUP BY messages.id "
+ rows = yield self._execute_and_decode(
+ sql,
+ *args
)
- cursor = txn.execute(query, query_args)
-
- # convert the result set into events
- entries = self.cursor_to_dict(cursor)
- events = []
- for entry in entries:
- # TODO we should spec the cursor > event mapping somewhere else.
- event = {}
- straight_mappings = ["msg_id", "user_id", "room_id"]
- for key in straight_mappings:
- event[key] = entry[key]
- event["content"] = json.loads(entry["content"])
- if entry["compressed_fb"]:
- event["feedback"] = json.loads(entry["compressed_fb"])
- events.append(event)
-
- latest_pkey = from_pkey if len(entries) == 0 else entries[-1]["id"]
-
- return (events, latest_pkey)
-
- def get_room_member_stream(self, user_id, from_key, to_key):
- """Get all room membership events for this user between the given keys.
-
- Args:
- user_id (str): The user who is requesting membership events.
- from_key (int): The ID to start returning results from (exclusive).
- to_key (int): The ID to stop returning results (exclusive).
- Returns:
- A tuple of rows (list of namedtuples), new_id(int)
- """
- return self._db_pool.runInteraction(
- self._get_room_member_rows, user_id, from_key, to_key
+ if rows:
+ topo = rows[-1]["topological_ordering"]
+ toke = rows[-1]["stream_ordering"]
+ next_token = "t%s-%s" % (topo, toke)
+ else:
+ # TODO (erikj): We should work out what to do here instead.
+ next_token = to_key if to_key else from_key
+
+ defer.returnValue(
+ (
+ [self._parse_event_from_row(r) for r in rows],
+ next_token
+ )
)
- def _get_room_member_rows(self, txn, user_id, from_pkey, to_pkey):
- # get all room membership events for rooms which the user is
- # *currently* joined in on, or all invite events for this user.
- current_membership_sub_query = (
- "(SELECT membership FROM room_memberships"
- + " WHERE user_id=? AND room_id = rm.room_id"
- + " ORDER BY id DESC LIMIT 1)")
-
- query = ("SELECT rm.* FROM room_memberships rm "
- # all membership events for rooms you've currently joined.
- + " WHERE (? IN " + current_membership_sub_query
- # all invite membership events for this user
- + " OR rm.membership=? AND user_id=?)"
- + " AND rm.id > ?")
- query_args = ["join", user_id, "invite", user_id, from_pkey]
-
- if to_pkey != -1:
- query += " AND rm.id < ?"
- query_args.append(to_pkey)
-
- cursor = txn.execute(query, query_args)
- return self._as_events(cursor, RoomMemberTable, from_pkey)
-
- def get_feedback_stream(self, user_id, from_key, to_key, room_id, limit=0):
- return self._db_pool.runInteraction(
- self._get_feedback_rows,
- user_id, from_key, to_key, room_id, limit
+ @defer.inlineCallbacks
+ def get_recent_events_for_room(self, room_id, limit, end_token,
+ with_feedback=False):
+ # TODO (erikj): Handle compressed feedback
+
+ sql = (
+ "SELECT * FROM events "
+ "WHERE room_id = ? AND stream_ordering <= ? "
+ "ORDER BY topological_ordering, stream_ordering DESC LIMIT ? "
)
- def _get_feedback_rows(self, txn, user_id, from_pkey, to_pkey, room_id,
- limit):
- # work out which rooms this user is joined in on and join them with
- # the room id on the feedback table, bounded by the specified pkeys
-
- # get all messages where the *current* membership state is 'join' for
- # this user in that room.
- query = (
- "SELECT feedback.* FROM feedback WHERE ? IN "
- + "(SELECT membership from room_memberships WHERE user_id=?"
- + " AND room_id = feedback.room_id ORDER BY id DESC LIMIT 1)")
- query_args = ["join", user_id]
-
- if room_id:
- query += " AND feedback.room_id=?"
- query_args.append(room_id)
-
- (query, query_args) = self._append_stream_operations(
- "feedback", query, query_args, from_pkey, to_pkey, limit=limit
+ rows = yield self._execute_and_decode(
+ sql,
+ room_id, end_token, limit
)
- cursor = txn.execute(query, query_args)
- return self._as_events(cursor, FeedbackTable, from_pkey)
+ rows.reverse() # As we selected with reverse ordering
+
+ if rows:
+ topo = rows[0]["topological_ordering"]
+ toke = rows[0]["stream_ordering"]
+ start_token = "t%s-%s" % (topo, toke)
- def get_room_data_stream(self, user_id, from_key, to_key, room_id,
- limit=0):
- return self._db_pool.runInteraction(
- self._get_room_data_rows,
- user_id, from_key, to_key, room_id, limit
+ token = (start_token, end_token)
+ else:
+ token = (end_token, end_token)
+
+ defer.returnValue(
+ (
+ [self._parse_event_from_row(r) for r in rows],
+ token
+ )
)
- def _get_room_data_rows(self, txn, user_id, from_pkey, to_pkey, room_id,
- limit):
- # work out which rooms this user is joined in on and join them with
- # the room id on the feedback table, bounded by the specified pkeys
-
- # get all messages where the *current* membership state is 'join' for
- # this user in that room.
- query = (
- "SELECT room_data.* FROM room_data WHERE ? IN "
- + "(SELECT membership from room_memberships WHERE user_id=?"
- + " AND room_id = room_data.room_id ORDER BY id DESC LIMIT 1)")
- query_args = ["join", user_id]
-
- if room_id:
- query += " AND room_data.room_id=?"
- query_args.append(room_id)
-
- (query, query_args) = self._append_stream_operations(
- "room_data", query, query_args, from_pkey, to_pkey, limit=limit
+ @defer.inlineCallbacks
+ def get_room_events_max_id(self):
+ res = yield self._execute_and_decode(
+ "SELECT MAX(stream_ordering) as m FROM events"
)
- cursor = txn.execute(query, query_args)
- return self._as_events(cursor, RoomDataTable, from_pkey)
-
- def _append_stream_operations(self, table_name, query, query_args,
- from_pkey, to_pkey, limit=None,
- group_by=""):
- LATEST_ROW = -1
- order_by = ""
- if to_pkey > from_pkey:
- if from_pkey != LATEST_ROW:
- # e.g. from=5 to=9 >> from 5 to 9 >> id>5 AND id<9
- query += (" AND %s.id > ? AND %s.id < ?" %
- (table_name, table_name))
- query_args.append(from_pkey)
- query_args.append(to_pkey)
- else:
- # e.g. from=-1 to=5 >> from now to 5 >> id>5 ORDER BY id DESC
- query += " AND %s.id > ? " % table_name
- order_by = "ORDER BY id DESC"
- query_args.append(to_pkey)
- elif from_pkey > to_pkey:
- if to_pkey != LATEST_ROW:
- # from=9 to=5 >> from 9 to 5 >> id>5 AND id<9 ORDER BY id DESC
- query += (" AND %s.id > ? AND %s.id < ? " %
- (table_name, table_name))
- order_by = "ORDER BY id DESC"
- query_args.append(to_pkey)
- query_args.append(from_pkey)
- else:
- # from=5 to=-1 >> from 5 to now >> id>5
- query += " AND %s.id > ?" % table_name
- query_args.append(from_pkey)
-
- query += group_by + order_by
-
- if limit and limit > 0:
- query += " LIMIT ?"
- query_args.append(str(limit))
-
- return (query, query_args)
-
- def _as_events(self, cursor, table, from_pkey):
- data_entries = table.decode_results(cursor)
- last_pkey = from_pkey
- if data_entries:
- last_pkey = data_entries[-1].id
-
- events = [
- entry.as_event(self.event_factory).get_dict()
- for entry in data_entries
- ]
-
- return (events, last_pkey)
+ logger.debug("get_room_events_max_id: %s", res)
+
+ if not res or not res[0] or not res[0]["m"]:
+ defer.returnValue("s1")
+ return
+
+ key = res[0]["m"] + 1
+ defer.returnValue("s%d" % (key,))
diff --git a/synapse/types.py b/synapse/types.py
index b8e191bb3c..fd6a3d1d72 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -38,6 +38,14 @@ class DomainSpecificString(
def __iter__(self):
raise ValueError("Attempted to iterate a %s" % (type(self).__name__))
+ # Because this class is a namedtuple of strings and booleans, it is deeply
+ # immutable.
+ def __copy__(self):
+ return self
+
+ def __deepcopy__(self, memo):
+ return self
+
@classmethod
def from_string(cls, s, hs):
"""Parse the string given by 's' into a structure object."""
|