diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/api/events/factory.py | 10 | ||||
-rw-r--r-- | synapse/api/events/room.py | 30 | ||||
-rw-r--r-- | synapse/api/notifier.py | 3 | ||||
-rw-r--r-- | synapse/api/streams/__init__.py | 19 | ||||
-rw-r--r-- | synapse/api/streams/event.py | 173 | ||||
-rw-r--r-- | synapse/federation/handler.py | 26 | ||||
-rw-r--r-- | synapse/federation/replication.py | 9 | ||||
-rw-r--r-- | synapse/handlers/events.py | 8 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 21 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 2 | ||||
-rw-r--r-- | synapse/handlers/room.py | 102 | ||||
-rw-r--r-- | synapse/rest/room.py | 59 | ||||
-rw-r--r-- | synapse/state.py | 2 | ||||
-rw-r--r-- | synapse/storage/__init__.py | 179 | ||||
-rw-r--r-- | synapse/storage/_base.py | 28 | ||||
-rw-r--r-- | synapse/storage/feedback.py | 72 | ||||
-rw-r--r-- | synapse/storage/message.py | 81 | ||||
-rw-r--r-- | synapse/storage/pdu.py | 22 | ||||
-rw-r--r-- | synapse/storage/room.py | 97 | ||||
-rw-r--r-- | synapse/storage/roomdata.py | 85 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 172 | ||||
-rw-r--r-- | synapse/storage/schema/im.sql | 81 | ||||
-rw-r--r-- | synapse/storage/stream.py | 445 |
23 files changed, 857 insertions, 869 deletions
diff --git a/synapse/api/events/factory.py b/synapse/api/events/factory.py index 12aa04fc6e..b61dac7acd 100644 --- a/synapse/api/events/factory.py +++ b/synapse/api/events/factory.py @@ -15,7 +15,7 @@ from synapse.api.events.room import ( RoomTopicEvent, MessageEvent, RoomMemberEvent, FeedbackEvent, - InviteJoinEvent, RoomConfigEvent + InviteJoinEvent, RoomConfigEvent, RoomNameEvent, GenericEvent, ) from synapse.util.stringutils import random_string @@ -25,6 +25,7 @@ class EventFactory(object): _event_classes = [ RoomTopicEvent, + RoomNameEvent, MessageEvent, RoomMemberEvent, FeedbackEvent, @@ -42,10 +43,9 @@ class EventFactory(object): if "event_id" not in kwargs: kwargs["event_id"] = random_string(10) - try: + if etype in self._event_list: handler = self._event_list[etype] - except KeyError: # unknown event type - # TODO allow custom event types. - raise NotImplementedError("Unknown etype=%s" % etype) + else: + handler = GenericEvent return handler(**kwargs) diff --git a/synapse/api/events/room.py b/synapse/api/events/room.py index f3df849af2..42459f3f21 100644 --- a/synapse/api/events/room.py +++ b/synapse/api/events/room.py @@ -16,17 +16,45 @@ from . import SynapseEvent +class GenericEvent(SynapseEvent): + def get_content_template(self): + return {} + + class RoomTopicEvent(SynapseEvent): TYPE = "m.room.topic" + internal_keys = SynapseEvent.internal_keys + [ + "topic", + ] + def __init__(self, **kwargs): kwargs["state_key"] = "" + if "topic" in kwargs["content"]: + kwargs["topic"] = kwargs["content"]["topic"] super(RoomTopicEvent, self).__init__(**kwargs) def get_content_template(self): return {"topic": u"string"} +class RoomNameEvent(SynapseEvent): + TYPE = "m.room.name" + + internal_keys = SynapseEvent.internal_keys + [ + "name", + ] + + def __init__(self, **kwargs): + kwargs["state_key"] = "" + if "name" in kwargs["content"]: + kwargs["name"] = kwargs["content"]["name"] + super(RoomNameEvent, self).__init__(**kwargs) + + def get_content_template(self): + return {"name": u"string"} + + class RoomMemberEvent(SynapseEvent): TYPE = "m.room.member" @@ -38,6 +66,8 @@ class RoomMemberEvent(SynapseEvent): def __init__(self, **kwargs): if "target_user_id" in kwargs: kwargs["state_key"] = kwargs["target_user_id"] + if "membership" not in kwargs: + kwargs["membership"] = kwargs.get("content", {}).get("membership") super(RoomMemberEvent, self).__init__(**kwargs) def get_content_template(self): diff --git a/synapse/api/notifier.py b/synapse/api/notifier.py index 65b5a4ebb3..9f622df6bb 100644 --- a/synapse/api/notifier.py +++ b/synapse/api/notifier.py @@ -15,6 +15,7 @@ from synapse.api.constants import Membership from synapse.api.events.room import RoomMemberEvent +from synapse.api.streams.event import EventsStreamData from twisted.internet import defer from twisted.internet import reactor @@ -66,7 +67,7 @@ class Notifier(object): self._notify_and_callback( user_id=user_id, event_data=event.get_dict(), - stream_type=event.type, + stream_type=EventsStreamData.EVENT_TYPE, store_id=store_id) def on_new_user_event(self, user_id, event_data, stream_type, store_id): diff --git a/synapse/api/streams/__init__.py b/synapse/api/streams/__init__.py index 989e63f9ec..44f4cc6078 100644 --- a/synapse/api/streams/__init__.py +++ b/synapse/api/streams/__init__.py @@ -20,23 +20,23 @@ class PaginationConfig(object): """A configuration object which stores pagination parameters.""" - def __init__(self, from_tok=None, to_tok=None, limit=0): + def __init__(self, from_tok=None, to_tok=None, direction='f', limit=0): self.from_tok = from_tok self.to_tok = to_tok + self.direction = direction self.limit = limit @classmethod def from_request(cls, request, raise_invalid_params=True): params = { - "from_tok": PaginationStream.TOK_START, - "to_tok": PaginationStream.TOK_END, - "limit": 0 + "direction": 'f', } query_param_mappings = [ # 3-tuple of qp_key, attribute, rules ("from", "from_tok", lambda x: type(x) == str), ("to", "to_tok", lambda x: type(x) == str), - ("limit", "limit", lambda x: x.isdigit()) + ("limit", "limit", lambda x: x.isdigit()), + ("dir", "direction", lambda x: x == 'f' or x == 'b'), ] for qp, attr, is_valid in query_param_mappings: @@ -48,12 +48,17 @@ class PaginationConfig(object): return PaginationConfig(**params) + def __str__(self): + return ( + "<PaginationConfig from_tok=%s, to_tok=%s, " + "direction=%s, limit=%s>" + ) % (self.from_tok, self.to_tok, self.direction, self.limit) + class PaginationStream(object): """ An interface for streaming data as chunks. """ - TOK_START = "START" TOK_END = "END" def get_chunk(self, config=None): @@ -76,7 +81,7 @@ class StreamData(object): self.hs = hs self.store = hs.get_datastore() - def get_rows(self, user_id, from_pkey, to_pkey, limit): + def get_rows(self, user_id, from_pkey, to_pkey, limit, direction): """ Get event stream data between the specified pkeys. Args: diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py index 4b6d739e54..a5c8b2b31f 100644 --- a/synapse/api/streams/event.py +++ b/synapse/api/streams/event.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.api.errors import EventStreamError +from synapse.api.events import SynapseEvent from synapse.api.events.room import ( RoomMemberEvent, MessageEvent, FeedbackEvent, RoomTopicEvent ) @@ -28,17 +29,17 @@ import logging logger = logging.getLogger(__name__) -class MessagesStreamData(StreamData): - EVENT_TYPE = MessageEvent.TYPE +class EventsStreamData(StreamData): + EVENT_TYPE = "EventsStream" def __init__(self, hs, room_id=None, feedback=False): - super(MessagesStreamData, self).__init__(hs) + super(EventsStreamData, self).__init__(hs) self.room_id = room_id self.with_feedback = feedback @defer.inlineCallbacks - def get_rows(self, user_id, from_key, to_key, limit): - (data, latest_ver) = yield self.store.get_message_stream( + def get_rows(self, user_id, from_key, to_key, limit, direction): + data, latest_ver = yield self.store.get_room_events( user_id=user_id, from_key=from_key, to_key=to_key, @@ -50,74 +51,7 @@ class MessagesStreamData(StreamData): @defer.inlineCallbacks def max_token(self): - val = yield self.store.get_max_message_id() - defer.returnValue(val) - - -class RoomMemberStreamData(StreamData): - EVENT_TYPE = RoomMemberEvent.TYPE - - @defer.inlineCallbacks - def get_rows(self, user_id, from_key, to_key, limit): - (data, latest_ver) = yield self.store.get_room_member_stream( - user_id=user_id, - from_key=from_key, - to_key=to_key - ) - - defer.returnValue((data, latest_ver)) - - @defer.inlineCallbacks - def max_token(self): - val = yield self.store.get_max_room_member_id() - defer.returnValue(val) - - -class FeedbackStreamData(StreamData): - EVENT_TYPE = FeedbackEvent.TYPE - - def __init__(self, hs, room_id=None): - super(FeedbackStreamData, self).__init__(hs) - self.room_id = room_id - - @defer.inlineCallbacks - def get_rows(self, user_id, from_key, to_key, limit): - (data, latest_ver) = yield self.store.get_feedback_stream( - user_id=user_id, - from_key=from_key, - to_key=to_key, - limit=limit, - room_id=self.room_id - ) - defer.returnValue((data, latest_ver)) - - @defer.inlineCallbacks - def max_token(self): - val = yield self.store.get_max_feedback_id() - defer.returnValue(val) - - -class RoomDataStreamData(StreamData): - EVENT_TYPE = RoomTopicEvent.TYPE # TODO need multiple event types - - def __init__(self, hs, room_id=None): - super(RoomDataStreamData, self).__init__(hs) - self.room_id = room_id - - @defer.inlineCallbacks - def get_rows(self, user_id, from_key, to_key, limit): - (data, latest_ver) = yield self.store.get_room_data_stream( - user_id=user_id, - from_key=from_key, - to_key=to_key, - limit=limit, - room_id=self.room_id - ) - defer.returnValue((data, latest_ver)) - - @defer.inlineCallbacks - def max_token(self): - val = yield self.store.get_max_room_data_id() + val = yield self.store.get_room_events_max_id() defer.returnValue(val) @@ -136,6 +70,15 @@ class EventStream(PaginationStream): pagination_config.from_tok) pagination_config.to_tok = yield self.fix_token( pagination_config.to_tok) + + if ( + not pagination_config.to_tok + and pagination_config.direction == 'f' + ): + pagination_config.to_tok = yield self.get_current_max_token() + + logger.debug("pagination_config: %s", pagination_config) + defer.returnValue(pagination_config) @defer.inlineCallbacks @@ -147,39 +90,42 @@ class EventStream(PaginationStream): Returns: The fixed-up token, which may == token. """ - # replace TOK_START and TOK_END with 0_0_0 or -1_-1_-1 depending. - replacements = [ - (PaginationStream.TOK_START, "0"), - (PaginationStream.TOK_END, "-1") - ] - for magic_token, key in replacements: - if magic_token == token: - token = EventStream.SEPARATOR.join( - [key] * len(self.stream_data) - ) - - # replace -1 values with an actual pkey - token_segments = self._split_token(token) - for i, tok in enumerate(token_segments): - if tok == -1: - # add 1 to the max token because results are EXCLUSIVE from the - # latest version. - token_segments[i] = 1 + (yield self.stream_data[i].max_token()) - defer.returnValue(EventStream.SEPARATOR.join( - str(x) for x in token_segments - )) + if token == PaginationStream.TOK_END: + new_token = yield self.get_current_max_token() + + logger.debug("fix_token: From %s to %s", token, new_token) + + token = new_token + + defer.returnValue(token) @defer.inlineCallbacks - def get_chunk(self, config=None): + def get_current_max_token(self): + new_token_parts = [] + for s in self.stream_data: + mx = yield s.max_token() + new_token_parts.append(str(mx)) + + new_token = EventStream.SEPARATOR.join(new_token_parts) + + logger.debug("get_current_max_token: %s", new_token) + + defer.returnValue(new_token) + + @defer.inlineCallbacks + def get_chunk(self, config): # no support for limit on >1 streams, makes no sense. if config.limit and len(self.stream_data) > 1: raise EventStreamError( 400, "Limit not supported on multiplexed streams." ) - (chunk_data, next_tok) = yield self._get_chunk_data(config.from_tok, - config.to_tok, - config.limit) + chunk_data, next_tok = yield self._get_chunk_data( + config.from_tok, + config.to_tok, + config.limit, + config.direction, + ) defer.returnValue({ "chunk": chunk_data, @@ -188,7 +134,7 @@ class EventStream(PaginationStream): }) @defer.inlineCallbacks - def _get_chunk_data(self, from_tok, to_tok, limit): + def _get_chunk_data(self, from_tok, to_tok, limit, direction): """ Get event data between the two tokens. Tokens are SEPARATOR separated values representing pkey values of @@ -206,11 +152,12 @@ class EventStream(PaginationStream): EventStreamError if something went wrong. """ # sanity check - if (from_tok.count(EventStream.SEPARATOR) != - to_tok.count(EventStream.SEPARATOR) or - (from_tok.count(EventStream.SEPARATOR) + 1) != - len(self.stream_data)): - raise EventStreamError(400, "Token lengths don't match.") + if to_tok is not None: + if (from_tok.count(EventStream.SEPARATOR) != + to_tok.count(EventStream.SEPARATOR) or + (from_tok.count(EventStream.SEPARATOR) + 1) != + len(self.stream_data)): + raise EventStreamError(400, "Token lengths don't match.") chunk = [] next_ver = [] @@ -224,10 +171,13 @@ class EventStream(PaginationStream): continue (event_chunk, max_pkey) = yield self.stream_data[i].get_rows( - self.user_id, from_pkey, to_pkey, limit + self.user_id, from_pkey, to_pkey, limit, direction, ) - chunk += event_chunk + chunk.extend([ + e.get_dict() if isinstance(e, SynapseEvent) else e + for e in event_chunk + ]) next_ver.append(str(max_pkey)) defer.returnValue((chunk, EventStream.SEPARATOR.join(next_ver))) @@ -240,9 +190,8 @@ class EventStream(PaginationStream): Returns: A list of ints. """ - segments = token.split(EventStream.SEPARATOR) - try: - int_segments = [int(x) for x in segments] - except ValueError: - raise EventStreamError(400, "Bad token: %s" % token) - return int_segments + if token: + segments = token.split(EventStream.SEPARATOR) + else: + segments = [None] * len(self.stream_data) + return segments diff --git a/synapse/federation/handler.py b/synapse/federation/handler.py index 580e591aca..984c1558e9 100644 --- a/synapse/federation/handler.py +++ b/synapse/federation/handler.py @@ -63,7 +63,7 @@ class FederationEventHandler(object): Deferred: Resolved when it has successfully been queued for processing. """ - yield self._fill_out_prev_events(event) + yield self.fill_out_prev_events(event) pdu = self.pdu_codec.pdu_from_event(event) @@ -74,10 +74,18 @@ class FederationEventHandler(object): @log_function @defer.inlineCallbacks - def backfill(self, room_id, limit): - # TODO: Work out which destinations to ask for backfill - # self.replication_layer.backfill(dest, room_id, limit) - pass + def backfill(self, dest, room_id, limit): + pdus = yield self.replication_layer.backfill(dest, room_id, limit) + + if not pdus: + defer.returnValue([]) + + events = [ + self.pdu_codec.event_from_pdu(pdu) + for pdu in pdus + ] + + defer.returnValue(events) @log_function def get_state_for_room(self, destination, room_id): @@ -87,7 +95,7 @@ class FederationEventHandler(object): @log_function @defer.inlineCallbacks - def on_receive_pdu(self, pdu): + def on_receive_pdu(self, pdu, backfilled): """ Called by the ReplicationLayer when we have a new pdu. We need to do auth checks and put it throught the StateHandler. """ @@ -95,7 +103,7 @@ class FederationEventHandler(object): try: with (yield self.lock_manager.lock(pdu.context)): - if event.is_state: + if event.is_state and not backfilled: is_new_state = yield self.state_handler.handle_new_state( pdu ) @@ -104,7 +112,7 @@ class FederationEventHandler(object): else: is_new_state = False - yield self.event_handler.on_receive(event, is_new_state) + yield self.event_handler.on_receive(event, is_new_state, backfilled) except AuthError: # TODO: Implement something in federation that allows us to @@ -129,7 +137,7 @@ class FederationEventHandler(object): yield self.event_handler.on_receive(new_state_event) @defer.inlineCallbacks - def _fill_out_prev_events(self, event): + def fill_out_prev_events(self, event): if hasattr(event, "prev_events"): return diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index c9f2e06b7b..8030d0963f 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -209,7 +209,7 @@ class ReplicationLayer(object): pdus = [Pdu(outlier=False, **p) for p in transaction.pdus] for pdu in pdus: - yield self._handle_new_pdu(pdu) + yield self._handle_new_pdu(pdu, backfilled=True) defer.returnValue(pdus) @@ -416,7 +416,7 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def _handle_new_pdu(self, pdu): + def _handle_new_pdu(self, pdu, backfilled=False): # We reprocess pdus when we have seen them only as outliers existing = yield self._get_persisted_pdu(pdu.pdu_id, pdu.origin) @@ -452,7 +452,10 @@ class ReplicationLayer(object): # Persist the Pdu, but don't mark it as processed yet. yield self.pdu_actions.persist_received(pdu) - ret = yield self.handler.on_receive_pdu(pdu) + if not backfilled: + ret = yield self.handler.on_receive_pdu(pdu, backfilled=backfilled) + else: + ret = None yield self.pdu_actions.mark_as_processed(pdu) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 3af7d824a2..6bb797caf2 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -17,8 +17,7 @@ from twisted.internet import defer from ._base import BaseHandler from synapse.api.streams.event import ( - EventStream, MessagesStreamData, RoomMemberStreamData, FeedbackStreamData, - RoomDataStreamData + EventStream, EventsStreamData ) from synapse.handlers.presence import PresenceStreamData @@ -26,10 +25,7 @@ from synapse.handlers.presence import PresenceStreamData class EventStreamHandler(BaseHandler): stream_data_classes = [ - MessagesStreamData, - RoomMemberStreamData, - FeedbackStreamData, - RoomDataStreamData, + EventsStreamData, PresenceStreamData, ] diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 7026df90a2..0430a8307e 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -35,7 +35,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def on_receive(self, event, is_new_state): + def on_receive(self, event, is_new_state, backfilled): if hasattr(event, "state_key") and not is_new_state: logger.debug("Ignoring old state.") return @@ -70,6 +70,21 @@ class FederationHandler(BaseHandler): else: with (yield self.room_lock.lock(event.room_id)): - store_id = yield self.store.persist_event(event) + store_id = yield self.store.persist_event(event, backfilled) - yield self.notifier.on_new_room_event(event, store_id) + if not backfilled: + yield self.notifier.on_new_room_event(event, store_id) + + + @log_function + @defer.inlineCallbacks + def backfill(self, dest, room_id, limit): + events = yield self.hs.get_federation().backfill(dest, room_id, limit) + + for event in events: + try: + yield self.store.persist_event(event, backfilled=True) + except: + logger.exception("Failed to persist event: %s", event) + + defer.returnValue(events) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 7e725d1027..60684f17d7 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -688,7 +688,7 @@ class PresenceStreamData(StreamData): super(PresenceStreamData, self).__init__(hs) self.presence = hs.get_handlers().presence_handler - def get_rows(self, user_id, from_key, to_key, limit): + def get_rows(self, user_id, from_key, to_key, limit, direction): cachemap = self.presence._user_cachemap # TODO(paul): limit, and filter by visibility diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 5d0379254b..40867ae2e0 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -23,7 +23,7 @@ from synapse.api.events.room import ( RoomTopicEvent, MessageEvent, InviteJoinEvent, RoomMemberEvent, RoomConfigEvent ) -from synapse.api.streams.event import EventStream, MessagesStreamData +from synapse.api.streams.event import EventStream, EventsStreamData from synapse.util import stringutils from ._base import BaseHandler @@ -59,12 +59,14 @@ class MessageHandler(BaseHandler): yield self.auth.check_joined_room(room_id, user_id) # Pull out the message from the db - msg = yield self.store.get_message(room_id=room_id, - msg_id=msg_id, - user_id=sender_id) +# msg = yield self.store.get_message( +# room_id=room_id, +# msg_id=msg_id, +# user_id=sender_id +# ) + + # TODO (erikj): Once we work out the correct c-s api we need to think on how to do this. - if msg: - defer.returnValue(msg) defer.returnValue(None) @defer.inlineCallbacks @@ -114,8 +116,9 @@ class MessageHandler(BaseHandler): """ yield self.auth.check_joined_room(room_id, user_id) - data_source = [MessagesStreamData(self.hs, room_id=room_id, - feedback=feedback)] + data_source = [ + EventsStreamData(self.hs, room_id=room_id, feedback=feedback) + ] event_stream = EventStream(user_id, data_source) pagin_config = yield event_stream.fix_tokens(pagin_config) data_chunk = yield event_stream.get_chunk(config=pagin_config) @@ -141,12 +144,7 @@ class MessageHandler(BaseHandler): yield self.state_handler.handle_new_event(event) # store in db - store_id = yield self.store.store_room_data( - room_id=event.room_id, - etype=event.type, - state_key=event.state_key, - content=json.dumps(event.content) - ) + store_id = yield self.store.persist_event(event) event.destinations = yield self.store.get_joined_hosts_for_room( event.room_id @@ -201,19 +199,17 @@ class MessageHandler(BaseHandler): raise RoomError( 403, "Member does not meet private room rules.") - data = yield self.store.get_room_data(room_id, event_type, state_key) + data = yield self.store.get_current_state( + room_id, event_type, state_key + ) defer.returnValue(data) @defer.inlineCallbacks - def get_feedback(self, room_id=None, msg_sender_id=None, msg_id=None, - user_id=None, fb_sender_id=None, fb_type=None): - yield self.auth.check_joined_room(room_id, user_id) + def get_feedback(self, event_id): + # yield self.auth.check_joined_room(room_id, user_id) # Pull out the feedback from the db - fb = yield self.store.get_feedback( - room_id=room_id, msg_id=msg_id, msg_sender_id=msg_sender_id, - fb_sender_id=fb_sender_id, fb_type=fb_type - ) + fb = yield self.store.get_feedback(event_id) if fb: defer.returnValue(fb) @@ -260,20 +256,35 @@ class MessageHandler(BaseHandler): user_id=user_id, membership_list=[Membership.INVITE, Membership.JOIN] ) - for room_info in room_list: - if room_info["membership"] != Membership.JOIN: + + ret = [] + + for event in room_list: + d = { + "room_id": event.room_id, + "membership": event.membership, + } + ret.append(d) + + if event.membership != Membership.JOIN: continue try: - event_chunk = yield self.get_messages( - user_id=user_id, - pagin_config=pagin_config, - feedback=feedback, - room_id=room_info["room_id"] + messages, token = yield self.store.get_recent_events_for_room( + event.room_id, + limit=50, ) - room_info["messages"] = event_chunk + + d["messages"] = { + "chunk": [m.get_dict() for m in messages], + "start": token[0], + "end": token[1], + } except: - pass - defer.returnValue(room_list) + logger.exception("Failed to get snapshot") + + logger.debug("snapshot_all_rooms returning: %s", ret) + + defer.returnValue(ret) class RoomCreationHandler(BaseHandler): @@ -451,11 +462,11 @@ class RoomMemberHandler(BaseHandler): member_list = yield self.store.get_room_members(room_id=room_id) event_list = [ - entry.as_event(self.event_factory).get_dict() + entry.get_dict() for entry in member_list ] chunk_data = { - "start": "START", + "start": "START", # FIXME (erikj): START is no longer a valid value "end": "END", "chunk": event_list } @@ -495,7 +506,7 @@ class RoomMemberHandler(BaseHandler): SynapseError if there was a problem changing the membership. """ - #broadcast_msg = False + # broadcast_msg = False prev_state = yield self.store.get_room_member( event.target_user_id, event.room_id @@ -569,7 +580,8 @@ class RoomMemberHandler(BaseHandler): defer.returnValue({"room_id": room_id}) @defer.inlineCallbacks - def _do_join(self, event, room_host=None, do_auth=True, broadcast_msg=True): + def _do_join(self, event, room_host=None, do_auth=True, + broadcast_msg=True): joinee = self.hs.parse_userid(event.target_user_id) # room_id = RoomID.from_string(event.room_id, self.hs) room_id = event.room_id @@ -598,7 +610,7 @@ class RoomMemberHandler(BaseHandler): if prev_state and prev_state.membership == Membership.INVITE: room = yield self.store.get_room(room_id) inviter = UserID.from_string( - prev_state.sender, self.hs + prev_state.user_id, self.hs ) should_do_dance = not inviter.is_mine and not room @@ -620,7 +632,6 @@ class RoomMemberHandler(BaseHandler): broadcast_msg=broadcast_msg, ) - if should_do_dance: yield self._do_invite_join_dance( room_id=room_id, @@ -694,18 +705,12 @@ class RoomMemberHandler(BaseHandler): user_id=user.to_string(), membership_list=membership_list ) - defer.returnValue([r["room_id"] for r in rooms]) + defer.returnValue([r.room_id for r in rooms]) @defer.inlineCallbacks def _do_local_membership_update(self, event, membership, broadcast_msg): # store membership - store_id = yield self.store.store_room_member( - user_id=event.target_user_id, - sender=event.user_id, - room_id=event.room_id, - content=event.content, - membership=membership - ) + store_id = yield self.store.persist_event(event) # Send a PDU to all hosts who have joined the room. destinations = yield self.store.get_joined_hosts_for_room( @@ -760,7 +765,7 @@ class RoomMemberHandler(BaseHandler): room_id, "", is_public=False ) - #yield self.state_handler.handle_new_event(event) + # yield self.state_handler.handle_new_event(event) yield federation.handle_new_event(new_event) yield federation.get_state_for_room( target_host, room_id @@ -805,5 +810,6 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def get_public_room_list(self): - chunk = yield self.store.get_rooms(is_public=True, with_topics=True) + chunk = yield self.store.get_rooms(is_public=True) + # FIXME (erikj): START is no longer a valid value defer.returnValue({"start": "START", "end": "END", "chunk": chunk}) diff --git a/synapse/rest/room.py b/synapse/rest/room.py index db8f18e8b3..1c48e63628 100644 --- a/synapse/rest/room.py +++ b/synapse/rest/room.py @@ -115,7 +115,7 @@ class RoomTopicRestServlet(RestServlet): if not data: raise SynapseError(404, "Topic not found.", errcode=Codes.NOT_FOUND) - defer.returnValue((200, json.loads(data.content))) + defer.returnValue((200, data.content)) @defer.inlineCallbacks def on_PUT(self, request, room_id): @@ -177,7 +177,7 @@ class RoomMemberRestServlet(RestServlet): if not member: raise SynapseError(404, "Member not found.", errcode=Codes.NOT_FOUND) - defer.returnValue((200, json.loads(member.content))) + defer.returnValue((200, member.content)) @defer.inlineCallbacks def on_DELETE(self, request, roomid, target_user_id): @@ -287,25 +287,28 @@ class FeedbackRestServlet(RestServlet): feedback_type): user = yield (self.auth.get_user_by_req(request)) - if feedback_type not in Feedback.LIST: - raise SynapseError(400, "Bad feedback type.", - errcode=Codes.BAD_JSON) - - msg_handler = self.handlers.message_handler - feedback = yield msg_handler.get_feedback( - room_id=urllib.unquote(room_id), - msg_sender_id=msg_sender_id, - msg_id=msg_id, - user_id=user.to_string(), - fb_sender_id=fb_sender_id, - fb_type=feedback_type - ) - - if not feedback: - raise SynapseError(404, "Feedback not found.", - errcode=Codes.NOT_FOUND) + # TODO (erikj): Implement this? + raise NotImplementedError("Getting feedback is not supported") - defer.returnValue((200, json.loads(feedback.content))) +# if feedback_type not in Feedback.LIST: +# raise SynapseError(400, "Bad feedback type.", +# errcode=Codes.BAD_JSON) +# +# msg_handler = self.handlers.message_handler +# feedback = yield msg_handler.get_feedback( +# room_id=urllib.unquote(room_id), +# msg_sender_id=msg_sender_id, +# msg_id=msg_id, +# user_id=user.to_string(), +# fb_sender_id=fb_sender_id, +# fb_type=feedback_type +# ) +# +# if not feedback: +# raise SynapseError(404, "Feedback not found.", +# errcode=Codes.NOT_FOUND) +# +# defer.returnValue((200, json.loads(feedback.content))) @defer.inlineCallbacks def on_PUT(self, request, room_id, sender_id, msg_id, fb_sender_id, @@ -382,6 +385,21 @@ class RoomMessageListRestServlet(RestServlet): defer.returnValue((200, msgs)) +class RoomTriggerBackfill(RestServlet): + PATTERN = client_path_pattern("/rooms/(?P<room_id>[^/]*)/backfill$") + + @defer.inlineCallbacks + def on_GET(self, request, room_id): + remote_server = urllib.unquote(request.args["remote"][0]) + room_id = urllib.unquote(room_id) + limit = int(request.args["limit"][0]) + + handler = self.handlers.federation_handler + events = yield handler.backfill(remote_server, room_id, limit) + + res = [event.get_dict() for event in events] + defer.returnValue((200, res)) + def _parse_json(request): try: content = json.loads(request.content.read()) @@ -402,3 +420,4 @@ def register_servlets(hs, http_server): RoomMemberListRestServlet(hs).register(http_server) RoomMessageListRestServlet(hs).register(http_server) JoinRoomAliasServlet(hs).register(http_server) + RoomTriggerBackfill(hs).register(http_server) diff --git a/synapse/state.py b/synapse/state.py index 4f8b4d9760..ca8e1ca630 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -86,7 +86,7 @@ class StateHandler(object): else: event.depth = 0 - current_state = yield self.store.get_current_state( + current_state = yield self.store.get_current_state_pdu( key.context, key.type, key.state_key ) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 5d5b5f7c44..470b7b7663 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -13,30 +13,35 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer from synapse.api.events.room import ( RoomMemberEvent, MessageEvent, RoomTopicEvent, FeedbackEvent, - RoomConfigEvent + RoomConfigEvent, RoomNameEvent, ) +from synapse.util.logutils import log_function + from .directory import DirectoryStore from .feedback import FeedbackStore -from .message import MessageStore from .presence import PresenceStore from .profile import ProfileStore from .registration import RegistrationStore from .room import RoomStore from .roommember import RoomMemberStore -from .roomdata import RoomDataStore from .stream import StreamStore from .pdu import StatePduStore, PduStore from .transactions import TransactionStore import json +import logging import os -class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore, +logger = logging.getLogger(__name__) + + +class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, FeedbackStore, PresenceStore, PduStore, StatePduStore, TransactionStore, DirectoryStore): @@ -44,51 +49,139 @@ class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore, def __init__(self, hs): super(DataStore, self).__init__(hs) self.event_factory = hs.get_event_factory() + self.hs = hs - def persist_event(self, event): - if event.type == MessageEvent.TYPE: - return self.store_message( - user_id=event.user_id, - room_id=event.room_id, - msg_id=event.msg_id, - content=json.dumps(event.content) - ) - elif event.type == RoomMemberEvent.TYPE: - return self.store_room_member( - user_id=event.target_user_id, - sender=event.user_id, - room_id=event.room_id, - content=event.content, - membership=event.content["membership"] - ) + self.min_token_deferred = self._get_min_token() + self.min_token = None + + @defer.inlineCallbacks + @log_function + def persist_event(self, event, backfilled=False): + if event.type == RoomMemberEvent.TYPE: + yield self._store_room_member(event) elif event.type == FeedbackEvent.TYPE: - return self.store_feedback( - room_id=event.room_id, - msg_id=event.msg_id, - msg_sender_id=event.msg_sender_id, - fb_sender_id=event.user_id, - fb_type=event.feedback_type, - content=json.dumps(event.content) - ) + yield self._store_feedback(event) +# elif event.type == RoomConfigEvent.TYPE: +# yield self._store_room_config(event) + elif event.type == RoomNameEvent.TYPE: + yield self._store_room_name(event) elif event.type == RoomTopicEvent.TYPE: - return self.store_room_data( - room_id=event.room_id, - etype=event.type, - state_key=event.state_key, - content=json.dumps(event.content) + yield self._store_room_topic(event) + + ret = yield self._store_event(event, backfilled) + defer.returnValue(ret) + + @defer.inlineCallbacks + def get_event(self, event_id): + events_dict = yield self._simple_select_one( + "events", + {"event_id": event_id}, + [ + "event_id", + "type", + "sender", + "room_id", + "content", + "unrecognized_keys" + ], + ) + + event = self._parse_event_from_row(events_dict) + defer.returnValue(event) + + @defer.inlineCallbacks + @log_function + def _store_event(self, event, backfilled): + # FIXME (erikj): This should be removed when we start amalgamating + # event and pdu storage + yield self.hs.get_federation().fill_out_prev_events(event) + + vals = { + "topological_ordering": event.depth, + "event_id": event.event_id, + "type": event.type, + "room_id": event.room_id, + "content": json.dumps(event.content), + "processed": True, + } + + if backfilled: + if not self.min_token_deferred.called: + yield self.min_token_deferred + self.min_token -= 1 + vals["stream_ordering"] = self.min_token + + unrec = { + k: v + for k, v in event.get_full_dict().items() + if k not in vals.keys() + } + vals["unrecognized_keys"] = json.dumps(unrec) + + try: + yield self._simple_insert("events", vals) + except: + logger.exception("Failed to persist, probably duplicate") + return + + if not backfilled and hasattr(event, "state_key"): + vals = { + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + } + + if hasattr(event, "prev_state"): + vals["prev_state"] = event.prev_state + + yield self._simple_insert("state_events", vals) + + yield self._simple_insert( + "current_state_events", + { + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + } ) - elif event.type == RoomConfigEvent.TYPE: - if "visibility" in event.content: - visibility = event.content["visibility"] - return self.store_room_config( - room_id=event.room_id, - visibility=visibility - ) + latest = yield self.get_room_events_max_id() + defer.returnValue(latest) + + @defer.inlineCallbacks + def get_current_state(self, room_id, event_type=None, state_key=""): + sql = ( + "SELECT e.* FROM events as e " + "INNER JOIN current_state_events as c ON e.event_id = c.event_id " + "INNER JOIN state_events as s ON e.event_id = s.event_id " + "WHERE c.room_id = ? " + ) + + if event_type: + sql += " AND s.type = ? AND s.state_key = ? " + args = (room_id, event_type, state_key) else: - raise NotImplementedError( - "Don't know how to persist type=%s" % event.type - ) + args = (room_id, ) + + results = yield self._execute_and_decode(sql, *args) + + defer.returnValue([self._parse_event_from_row(r) for r in results]) + + @defer.inlineCallbacks + def _get_min_token(self): + row = yield self._execute( + None, + "SELECT MIN(stream_ordering) FROM events" + ) + + self.min_token = row[0][0] if row and row[0] and row[0][0] else -1 + self.min_token = min(self.min_token, -1) + + logger.debug("min_token is: %s", self.min_token) + + defer.returnValue(self.min_token) def schema_path(schema): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index bf1800f4bf..36cc57c1b8 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from twisted.internet import defer @@ -20,6 +19,9 @@ from twisted.internet import defer from synapse.api.errors import StoreError import collections +import copy +import json + logger = logging.getLogger(__name__) @@ -29,6 +31,7 @@ class SQLBaseStore(object): def __init__(self, hs): self.hs = hs self._db_pool = hs.get_db_pool() + self.event_factory = hs.get_event_factory() self._clock = hs.get_clock() def cursor_to_dict(self, cursor): @@ -57,14 +60,22 @@ class SQLBaseStore(object): The result of decoder(results) """ logger.debug( - "[SQL] %s Args=%s Func=%s", query, args, decoder.__name__ + "[SQL] %s Args=%s Func=%s", + query, args, decoder.__name__ if decoder else None ) def interaction(txn): cursor = txn.execute(query, args) - return decoder(cursor) + if decoder: + return decoder(cursor) + else: + return cursor.fetchall() + return self._db_pool.runInteraction(interaction) + def _execute_and_decode(self, query, *args): + return self._execute(self.cursor_to_dict, query, *args) + # "Simple" SQL API methods that operate on a single table with no JOINs, # no complex WHERE clauses, just a dict of values for columns. @@ -281,6 +292,17 @@ class SQLBaseStore(object): return self._db_pool.runInteraction(func) + def _parse_event_from_row(self, row_dict): + d = copy.deepcopy({k: v for k, v in row_dict.items() if v}) + d.update(json.loads(row_dict["unrecognized_keys"])) + d["content"] = json.loads(d["content"]) + del d["unrecognized_keys"] + + return self.event_factory.create_event( + etype=d["type"], + **d + ) + class Table(object): """ A base class used to store information about a particular table. diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py index 9bd562c762..e60f98d1e1 100644 --- a/synapse/storage/feedback.py +++ b/synapse/storage/feedback.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer + from ._base import SQLBaseStore, Table from synapse.api.events.room import FeedbackEvent @@ -22,54 +24,28 @@ import json class FeedbackStore(SQLBaseStore): - def store_feedback(self, room_id, msg_id, msg_sender_id, - fb_sender_id, fb_type, content): - return self._simple_insert(FeedbackTable.table_name, dict( - room_id=room_id, - msg_id=msg_id, - msg_sender_id=msg_sender_id, - fb_sender_id=fb_sender_id, - fb_type=fb_type, - content=content, - )) - - def get_feedback(self, room_id=None, msg_id=None, msg_sender_id=None, - fb_sender_id=None, fb_type=None): - query = FeedbackTable.select_statement( - "msg_sender_id = ? AND room_id = ? AND msg_id = ? " + - "AND fb_sender_id = ? AND feedback_type = ? " + - "ORDER BY id DESC LIMIT 1") - return self._execute( - FeedbackTable.decode_single_result, - query, msg_sender_id, room_id, msg_id, fb_sender_id, fb_type, + def _store_feedback(self, event): + return self._simple_insert("feedback", { + "event_id": event.event_id, + "feedback_type": event.feedback_type, + "room_id": event.room_id, + "target_event_id": event.target_event, + "sender": event.user_id, + }) + + @defer.inlineCallbacks + def get_feedback_for_event(self, event_id): + sql = ( + "SELECT events.* FROM events INNER JOIN feedback " + "ON events.event_id = feedback.event_id " + "WHERE feedback.target_event_id = ? " ) - def get_max_feedback_id(self): - return self._simple_max_id(FeedbackTable.table_name) - - -class FeedbackTable(Table): - table_name = "feedback" + rows = yield self._execute_and_decode(sql, event_id) - fields = [ - "id", - "content", - "feedback_type", - "fb_sender_id", - "msg_id", - "room_id", - "msg_sender_id" - ] - - class EntryType(collections.namedtuple("FeedbackEntry", fields)): - - def as_event(self, event_factory): - return event_factory.create_event( - etype=FeedbackEvent.TYPE, - room_id=self.room_id, - msg_id=self.msg_id, - msg_sender_id=self.msg_sender_id, - user_id=self.fb_sender_id, - feedback_type=self.feedback_type, - content=json.loads(self.content), - ) + defer.returnValue( + [ + self._parse_event_from_row(r) + for r in rows + ] + ) diff --git a/synapse/storage/message.py b/synapse/storage/message.py deleted file mode 100644 index 7bb69c1384..0000000000 --- a/synapse/storage/message.py +++ /dev/null @@ -1,81 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ._base import SQLBaseStore, Table -from synapse.api.events.room import MessageEvent - -import collections -import json - - -class MessageStore(SQLBaseStore): - - def get_message(self, user_id, room_id, msg_id): - """Get a message from the store. - - Args: - user_id (str): The ID of the user who sent the message. - room_id (str): The room the message was sent in. - msg_id (str): The unique ID for this user/room combo. - """ - query = MessagesTable.select_statement( - "user_id = ? AND room_id = ? AND msg_id = ? " + - "ORDER BY id DESC LIMIT 1") - return self._execute( - MessagesTable.decode_single_result, - query, user_id, room_id, msg_id, - ) - - def store_message(self, user_id, room_id, msg_id, content): - """Store a message in the store. - - Args: - user_id (str): The ID of the user who sent the message. - room_id (str): The room the message was sent in. - msg_id (str): The unique ID for this user/room combo. - content (str): The content of the message (JSON) - """ - return self._simple_insert(MessagesTable.table_name, dict( - user_id=user_id, - room_id=room_id, - msg_id=msg_id, - content=content, - )) - - def get_max_message_id(self): - return self._simple_max_id(MessagesTable.table_name) - - -class MessagesTable(Table): - table_name = "messages" - - fields = [ - "id", - "user_id", - "room_id", - "msg_id", - "content" - ] - - class EntryType(collections.namedtuple("MessageEntry", fields)): - - def as_event(self, event_factory): - return event_factory.create_event( - etype=MessageEvent.TYPE, - room_id=self.room_id, - user_id=self.user_id, - msg_id=self.msg_id, - content=json.loads(self.content), - ) diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py index 13adc581e1..7655f43ede 100644 --- a/synapse/storage/pdu.py +++ b/synapse/storage/pdu.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer + from ._base import SQLBaseStore, Table, JoinHelper from synapse.util.logutils import log_function @@ -319,6 +321,7 @@ class PduStore(SQLBaseStore): return [(row[0], row[1], row[2]) for row in results] + @defer.inlineCallbacks def get_oldest_pdus_in_context(self, context): """Get a list of Pdus that we haven't backfilled beyond yet (and haven't seen). This list is used when we want to backfill backwards and is the @@ -331,17 +334,14 @@ class PduStore(SQLBaseStore): Returns: list: A list of PduIdTuple. """ - return self._db_pool.runInteraction( - self._get_oldest_pdus_in_context, context - ) - - def _get_oldest_pdus_in_context(self, txn, context): - txn.execute( + results = yield self._execute( + None, "SELECT pdu_id, origin FROM %(back)s WHERE context = ?" % {"back": PduBackwardExtremitiesTable.table_name, }, - (context,) + context ) - return [PduIdTuple(i, o) for i, o in txn.fetchall()] + + defer.returnValue([PduIdTuple(i, o) for i, o in results]) def is_pdu_new(self, pdu_id, origin, context, depth): """For a given Pdu, try and figure out if it's 'new', i.e., if it's @@ -580,7 +580,7 @@ class StatePduStore(SQLBaseStore): txn.execute(query, query_args) - def get_current_state(self, context, pdu_type, state_key): + def get_current_state_pdu(self, context, pdu_type, state_key): """For a given context, pdu_type, state_key 3-tuple, return what is currently considered the current state. @@ -595,10 +595,10 @@ class StatePduStore(SQLBaseStore): """ return self._db_pool.runInteraction( - self._get_current_state, context, pdu_type, state_key + self._get_current_state_pdu, context, pdu_type, state_key ) - def _get_current_state(self, txn, context, pdu_type, state_key): + def _get_current_state_pdu(self, txn, context, pdu_type, state_key): return self._get_current_interaction(txn, context, pdu_type, state_key) def _get_current_interaction(self, txn, context, pdu_type, state_key): diff --git a/synapse/storage/room.py b/synapse/storage/room.py index a97162831b..22f2dcca45 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -76,49 +76,80 @@ class RoomStore(SQLBaseStore): ) @defer.inlineCallbacks - def get_rooms(self, is_public, with_topics): + def get_rooms(self, is_public): """Retrieve a list of all public rooms. Args: is_public (bool): True if the rooms returned should be public. - with_topics (bool): True to include the current topic for the room - in the response. Returns: - A list of room dicts containing at least a "room_id" key, and a - "topic" key if one is set and with_topic=True. + A list of room dicts containing at least a "room_id" key, a + "topic" key if one is set, and a "name" key if one is set """ - room_data_type = RoomTopicEvent.TYPE - public = 1 if is_public else 0 - - latest_topic = ("SELECT max(room_data.id) FROM room_data WHERE " - + "room_data.type = ? GROUP BY room_id") - - query = ("SELECT rooms.*, room_data.content, room_alias FROM rooms " - + "LEFT JOIN " - + "room_aliases ON room_aliases.room_id = rooms.room_id " - + "LEFT JOIN " - + "room_data ON rooms.room_id = room_data.room_id WHERE " - + "(room_data.id IN (" + latest_topic + ") " - + "OR room_data.id IS NULL) AND rooms.is_public = ?") - - res = yield self._execute( - self.cursor_to_dict, query, room_data_type, public + + topic_subquery = ( + "SELECT topics.event_id as event_id, " + "topics.room_id as room_id, topic " + "FROM topics " + "INNER JOIN current_state_events as c " + "ON c.event_id = topics.event_id " ) - # return only the keys the specification expects - ret_keys = ["room_id", "topic", "room_alias"] + name_subquery = ( + "SELECT room_names.event_id as event_id, " + "room_names.room_id as room_id, name " + "FROM room_names " + "INNER JOIN current_state_events as c " + "ON c.event_id = room_names.event_id " + ) - # extract topic from the json (icky) FIXME - for i, room_row in enumerate(res): - try: - content_json = json.loads(room_row["content"]) - room_row["topic"] = content_json["topic"] - except: - pass # no topic set - # filter the dict based on ret_keys - res[i] = {k: v for k, v in room_row.iteritems() if k in ret_keys} + # We use non printing ascii character US () as a seperator + sql = ( + "SELECT r.room_id, n.name, t.topic, " + "group_concat(a.room_alias, '') " + "FROM rooms AS r " + "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id " + "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id " + "INNER JOIN room_aliases AS a ON a.room_id = r.room_id " + "WHERE r.is_public = ? " + "GROUP BY r.room_id " + ) % { + "topic": topic_subquery, + "name": name_subquery, + } + + rows = yield self._execute(None, sql, is_public) + + ret = [ + { + "room_id": r[0], + "name": r[1], + "topic": r[2], + "aliases": r[3].split(""), + } + for r in rows + ] + + defer.returnValue(ret) + + def _store_room_topic(self, event): + return self._simple_insert( + "topics", + { + "event_id": event.event_id, + "room_id": event.room_id, + "topic": event.topic, + } + ) - defer.returnValue(res) + def _store_room_name(self, event): + return self._simple_insert( + "room_names", + { + "event_id": event.event_id, + "room_id": event.room_id, + "name": event.name, + } + ) class RoomsTable(Table): diff --git a/synapse/storage/roomdata.py b/synapse/storage/roomdata.py deleted file mode 100644 index cc04d1ba14..0000000000 --- a/synapse/storage/roomdata.py +++ /dev/null @@ -1,85 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ._base import SQLBaseStore, Table - -import collections -import json - - -class RoomDataStore(SQLBaseStore): - - """Provides various CRUD operations for Room Events. """ - - def get_room_data(self, room_id, etype, state_key=""): - """Retrieve the data stored under this type and state_key. - - Args: - room_id (str) - etype (str) - state_key (str) - Returns: - namedtuple: Or None if nothing exists at this path. - """ - query = RoomDataTable.select_statement( - "room_id = ? AND type = ? AND state_key = ? " - "ORDER BY id DESC LIMIT 1" - ) - return self._execute( - RoomDataTable.decode_single_result, - query, room_id, etype, state_key, - ) - - def store_room_data(self, room_id, etype, state_key="", content=None): - """Stores room specific data. - - Args: - room_id (str) - etype (str) - state_key (str) - data (str)- The data to store for this path in JSON. - Returns: - The store ID for this data. - """ - return self._simple_insert(RoomDataTable.table_name, dict( - etype=etype, - state_key=state_key, - room_id=room_id, - content=content, - )) - - def get_max_room_data_id(self): - return self._simple_max_id(RoomDataTable.table_name) - - -class RoomDataTable(Table): - table_name = "room_data" - - fields = [ - "id", - "room_id", - "type", - "state_key", - "content" - ] - - class EntryType(collections.namedtuple("RoomDataEntry", fields)): - - def as_event(self, event_factory): - return event_factory.create_event( - etype=self.type, - room_id=self.room_id, - content=json.loads(self.content), - ) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index c45d128f1b..89c87290cf 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -31,6 +31,38 @@ logger = logging.getLogger(__name__) class RoomMemberStore(SQLBaseStore): + @defer.inlineCallbacks + def _store_room_member(self, event): + """Store a room member in the database. + """ + domain = self.hs.parse_userid(event.target_user_id).domain + + yield self._simple_insert( + "room_memberships", + { + "event_id": event.event_id, + "user_id": event.target_user_id, + "sender": event.user_id, + "room_id": event.room_id, + "membership": event.membership, + } + ) + + # Update room hosts table + if event.membership == Membership.JOIN: + sql = ( + "INSERT OR IGNORE INTO room_hosts (room_id, host) " + "VALUES (?, ?)" + ) + yield self._execute(None, sql, event.room_id, domain) + else: + sql = ( + "DELETE FROM room_hosts WHERE room_id = ? AND host = ?" + ) + + yield self._execute(None, sql, event.room_id, domain) + + @defer.inlineCallbacks def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. @@ -38,36 +70,15 @@ class RoomMemberStore(SQLBaseStore): user_id (str): The member's user ID. room_id (str): The room the member is in. Returns: - namedtuple: The room member from the database, or None if this - member does not exist. + Deferred: Results in a MembershipEvent or None. """ - query = RoomMemberTable.select_statement( - "room_id = ? AND user_id = ? ORDER BY id DESC LIMIT 1") - return self._execute( - RoomMemberTable.decode_single_result, - query, room_id, user_id, - ) + rows = yield self._get_members_by_dict({ + "e.room_id": room_id, + "m.user_id": user_id, + }) - def store_room_member(self, user_id, sender, room_id, membership, content): - """Store a room member in the database. + defer.returnValue(rows[0] if rows else None) - Args: - user_id (str): The member's user ID. - room_id (str): The room in relation to the member. - membership (synapse.api.constants.Membership): The new membership - state. - content (dict): The content of the membership (JSON). - """ - content_json = json.dumps(content) - return self._simple_insert(RoomMemberTable.table_name, dict( - user_id=user_id, - sender=sender, - room_id=room_id, - membership=membership, - content=content_json, - )) - - @defer.inlineCallbacks def get_room_members(self, room_id, membership=None): """Retrieve the current room member list for a room. @@ -79,17 +90,12 @@ class RoomMemberStore(SQLBaseStore): Returns: list of namedtuples representing the members in this room. """ - query = RoomMemberTable.select_statement( - "id IN (SELECT MAX(id) FROM " + RoomMemberTable.table_name - + " WHERE room_id = ? GROUP BY user_id)" - ) - res = yield self._execute( - RoomMemberTable.decode_results, query, room_id, - ) - # strip memberships which don't match + + where = {"m.room_id": room_id} if membership: - res = [entry for entry in res if entry.membership == membership] - defer.returnValue(res) + where["m.membership"] = membership + + return self._get_members_by_dict(where) def get_rooms_for_user_where_membership_is(self, user_id, membership_list): """ Get all the rooms for this user where the membership for this user @@ -106,70 +112,40 @@ class RoomMemberStore(SQLBaseStore): return defer.succeed(None) args = [user_id] - membership_placeholder = ["membership=?"] * len(membership_list) - where_membership = "(" + " OR ".join(membership_placeholder) + ")" - for membership in membership_list: - args.append(membership) - - # sub-select finds the row ID for the most recent (i.e. current) - # state change of this user per room, then the outer select finds those - query = ("SELECT room_id, membership FROM room_memberships" - + " WHERE id IN (SELECT MAX(id) FROM room_memberships" - + " WHERE user_id=? GROUP BY room_id)" - + " AND " + where_membership) - return self._execute( - self.cursor_to_dict, query, *args - ) + args.extend(membership_list) - @defer.inlineCallbacks - def get_joined_hosts_for_room(self, room_id): - query = RoomMemberTable.select_statement( - "id IN (SELECT MAX(id) FROM " + RoomMemberTable.table_name - + " WHERE room_id = ? GROUP BY user_id)" - ) - - res = yield self._execute( - RoomMemberTable.decode_results, query, room_id, + where_clause = "user_id = ? AND (%s)" % ( + " OR ".join(["membership = ?" for _ in membership_list]), ) - def host_from_user_id_string(user_id): - domain = UserID.from_string(entry.user_id, self.hs).domain - return domain - - # strip memberships which don't match - hosts = [ - host_from_user_id_string(entry.user_id) - for entry in res - if entry.membership == Membership.JOIN - ] + return self._get_members_query(where_clause, args) - logger.debug("Returning hosts: %s from results: %s", hosts, res) - - defer.returnValue(hosts) - - def get_max_room_member_id(self): - return self._simple_max_id(RoomMemberTable.table_name) - - -class RoomMemberTable(Table): - table_name = "room_memberships" - - fields = [ - "id", - "user_id", - "sender", - "room_id", - "membership", - "content" - ] + def get_joined_hosts_for_room(self, room_id): + return self._simple_select_onecol( + "room_hosts", + {"room_id": room_id}, + "host" + ) - class EntryType(collections.namedtuple("RoomMemberEntry", fields)): + def _get_members_by_dict(self, where_dict): + clause = " AND ".join("%s = ?" % k for k in where_dict.keys()) + vals = where_dict.values() + return self._get_members_query(clause, vals) - def as_event(self, event_factory): - return event_factory.create_event( - etype=RoomMemberEvent.TYPE, - room_id=self.room_id, - target_user_id=self.user_id, - user_id=self.sender, - content=json.loads(self.content), - ) + @defer.inlineCallbacks + def _get_members_query(self, where_clause, where_values): + sql = ( + "SELECT e.* FROM events as e " + "INNER JOIN room_memberships as m " + "ON e.event_id = m.event_id " + "INNER JOIN current_state_events as c " + "ON m.event_id = c.event_id " + "WHERE %s " + ) % (where_clause,) + + rows = yield self._execute_and_decode(sql, *where_values) + + logger.debug("_get_members_query Got rows %s", rows) + + results = [self._parse_event_from_row(r) for r in rows] + defer.returnValue(results) diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index 77096546b2..ea04261ff0 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -12,43 +12,70 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -CREATE TABLE IF NOT EXISTS rooms( - room_id TEXT PRIMARY KEY NOT NULL, - is_public INTEGER, - creator TEXT + +CREATE TABLE IF NOT EXISTS events( + stream_ordering INTEGER PRIMARY KEY AUTOINCREMENT, + topological_ordering INTEGER NOT NULL, + event_id TEXT NOT NULL, + type TEXT NOT NULL, + room_id TEXT NOT NULL, + content TEXT NOT NULL, + unrecognized_keys TEXT, + processed BOOL NOT NULL, + CONSTRAINT ev_uniq UNIQUE (event_id) ); -CREATE TABLE IF NOT EXISTS room_memberships( - id INTEGER PRIMARY KEY AUTOINCREMENT, - user_id TEXT NOT NULL, -- no foreign key to users table, it could be an id belonging to another home server - sender TEXT NOT NULL, +CREATE TABLE IF NOT EXISTS state_events( + event_id TEXT NOT NULL, room_id TEXT NOT NULL, - membership TEXT NOT NULL, - content TEXT NOT NULL + type TEXT NOT NULL, + state_key TEXT NOT NULL, + prev_state TEXT ); -CREATE TABLE IF NOT EXISTS messages( - id INTEGER PRIMARY KEY AUTOINCREMENT, - user_id TEXT, - room_id TEXT, - msg_id TEXT, - content TEXT +CREATE TABLE IF NOT EXISTS current_state_events( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + type TEXT NOT NULL, + state_key TEXT NOT NULL, + CONSTRAINT curr_uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE +); + +CREATE TABLE IF NOT EXISTS room_memberships( + event_id TEXT NOT NULL, + user_id TEXT NOT NULL, + sender TEXT NOT NULL, + room_id TEXT NOT NULL, + membership TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS feedback( - id INTEGER PRIMARY KEY AUTOINCREMENT, - content TEXT, + event_id TEXT NOT NULL, feedback_type TEXT, - fb_sender_id TEXT, - msg_id TEXT, - room_id TEXT, - msg_sender_id TEXT + target_event_id TEXT, + sender TEXT, + room_id TEXT ); -CREATE TABLE IF NOT EXISTS room_data( - id INTEGER PRIMARY KEY AUTOINCREMENT, +CREATE TABLE IF NOT EXISTS topics( + event_id TEXT NOT NULL, room_id TEXT NOT NULL, - type TEXT NOT NULL, - state_key TEXT NOT NULL, - content TEXT + topic TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS room_names( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + name TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS rooms( + room_id TEXT PRIMARY KEY NOT NULL, + is_public INTEGER, + creator TEXT +); + +CREATE TABLE IF NOT EXISTS room_hosts( + room_id TEXT NOT NULL, + host TEXT NOT NULL ); diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 47a1f2c45a..87fc978813 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -13,267 +13,264 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer from ._base import SQLBaseStore -from .message import MessagesTable -from .feedback import FeedbackTable -from .roomdata import RoomDataTable -from .roommember import RoomMemberTable +from synapse.api.errors import SynapseError +from synapse.api.constants import Membership +from synapse.util.logutils import log_function import json import logging + logger = logging.getLogger(__name__) +MAX_STREAM_SIZE = 1000 + + +_STREAM_TOKEN = "stream" +_TOPOLOGICAL_TOKEN = "topological" + + +def _parse_stream_token(string): + try: + if string[0] != 's': + raise + return int(string[1:]) + except: + logger.debug("Not stream token: %s", string) + raise SynapseError(400, "Invalid token") + + +def _parse_topological_token(string): + try: + if string[0] != 't': + raise + parts = string[1:].split('-', 1) + return (int(parts[0]), int(parts[1])) + except: + logger.debug("Not topological token: %s", string) + raise SynapseError(400, "Invalid token") + + +def is_stream_token(string): + try: + _parse_stream_token(string) + return True + except: + return False + + +def is_topological_token(string): + try: + _parse_topological_token(string) + return True + except: + return False + + +def _get_token_bound(token, comparison): + try: + s = _parse_stream_token(token) + return "%s %s %d" % ("stream_ordering", comparison, s) + except: + pass + + try: + top, stream = _parse_topological_token(token) + return "%s %s %d AND %s %s %d" % ( + "topological_ordering", comparison, top, + "stream_ordering", comparison, stream, + ) + except: + pass + + raise SynapseError(400, "Invalid token") + + class StreamStore(SQLBaseStore): + @log_function + def get_room_events(self, user_id, from_key, to_key, room_id, limit=0, + direction='f', with_feedback=False): + is_events = ( + direction == 'f' + and is_stream_token(from_key) + and to_key and is_stream_token(to_key) + ) - def get_message_stream(self, user_id, from_key, to_key, room_id, limit=0, - with_feedback=False): - """Get all messages for this user between the given keys. - - Args: - user_id (str): The user who is requesting messages. - from_key (int): The ID to start returning results from (exclusive). - to_key (int): The ID to stop returning results (exclusive). - room_id (str): Gets messages only for this room. Can be None, in - which case all room messages will be returned. - Returns: - A tuple of rows (list of namedtuples), new_id(int) - """ - if with_feedback and room_id: # with fb MUST specify a room ID - return self._db_pool.runInteraction( - self._get_message_rows_with_feedback, - user_id, from_key, to_key, room_id, limit + if is_events: + return self.get_room_events_stream( + user_id=user_id, + from_key=from_key, + to_key=to_key, + room_id=room_id, + limit=limit, + with_feedback=with_feedback, ) else: - return self._db_pool.runInteraction( - self._get_message_rows, - user_id, from_key, to_key, room_id, limit + return self.paginate_room_events( + from_key=from_key, + to_key=to_key, + room_id=room_id, + limit=limit, + with_feedback=with_feedback, ) - def _get_message_rows(self, txn, user_id, from_pkey, to_pkey, room_id, - limit): - # work out which rooms this user is joined in on and join them with - # the room id on the messages table, bounded by the specified pkeys + @defer.inlineCallbacks + @log_function + def get_room_events_stream(self, user_id, from_key, to_key, room_id, + limit=0, with_feedback=False): + # TODO (erikj): Handle compressed feedback - # get all messages where the *current* membership state is 'join' for - # this user in that room. - query = ("SELECT messages.* FROM messages WHERE ? IN" - + " (SELECT membership from room_memberships WHERE user_id=?" - + " AND room_id = messages.room_id ORDER BY id DESC LIMIT 1)") - query_args = ["join", user_id] + current_room_membership_sql = ( + "SELECT m.room_id FROM room_memberships as m " + "INNER JOIN current_state_events as c ON m.event_id = c.event_id " + "WHERE m.user_id = ?" + ) - if room_id: - query += " AND messages.room_id=?" - query_args.append(room_id) + invites_sql = ( + "SELECT m.event_id FROM room_memberships as m " + "INNER JOIN current_state_events as c ON m.event_id = c.event_id " + "WHERE m.user_id = ? AND m.membership = ?" + ) - (query, query_args) = self._append_stream_operations( - "messages", query, query_args, from_pkey, to_pkey, limit=limit + if limit: + limit = max(limit, MAX_STREAM_SIZE) + else: + limit = MAX_STREAM_SIZE + + # From and to keys should be integers from ordering. + from_id = _parse_stream_token(from_key) + to_id = _parse_stream_token(to_key) + + if from_key == to_key: + defer.returnValue(([], to_key)) + return + + sql = ( + "SELECT * FROM events as e WHERE " + "((room_id IN (%(current)s)) OR " + "(event_id IN (%(invites)s))) " + "AND e.stream_ordering > ? AND e.stream_ordering < ? " + "ORDER BY stream_ordering ASC LIMIT %(limit)d " + ) % { + "current": current_room_membership_sql, + "invites": invites_sql, + "limit": limit + } + + rows = yield self._execute_and_decode( + sql, + user_id, user_id, Membership.INVITE, from_id, to_id ) - cursor = txn.execute(query, query_args) - return self._as_events(cursor, MessagesTable, from_pkey) + ret = [self._parse_event_from_row(r) for r in rows] - def _get_message_rows_with_feedback(self, txn, user_id, from_pkey, to_pkey, - room_id, limit): - # this col represents the compressed feedback JSON as per spec - compressed_feedback_col = ( - "'[' || group_concat('{\"sender_id\":\"' || f.fb_sender_id" - + " || '\",\"feedback_type\":\"' || f.feedback_type" - + " || '\",\"content\":' || f.content || '}') || ']'" - ) + if rows: + key = "s%d" % max([r["stream_ordering"] for r in rows]) + else: + # Assume we didn't get anything because there was nothing to get. + key = to_key - global_msg_id_join = ("f.room_id = messages.room_id" - + " and f.msg_id = messages.msg_id" - + " and messages.user_id = f.msg_sender_id") + defer.returnValue((ret, key)) - select_query = ( - "SELECT messages.*, f.content AS fb_content, f.fb_sender_id" - + ", " + compressed_feedback_col + " AS compressed_fb" - + " FROM messages LEFT JOIN feedback f ON " + global_msg_id_join) + @defer.inlineCallbacks + @log_function + def paginate_room_events(self, room_id, from_key, to_key=None, + direction='b', limit=-1, + with_feedback=False): + # TODO (erikj): Handle compressed feedback - current_membership_sub_query = ( - "(SELECT membership from room_memberships rm" - + " WHERE user_id=? AND room_id = rm.room_id" - + " ORDER BY id DESC LIMIT 1)") + from_comp = '<' if direction =='b' else '>' + to_comp = '>' if direction =='b' else '<' + order = "DESC" if direction == 'b' else "ASC" - where = (" WHERE ? IN " + current_membership_sub_query - + " AND messages.room_id=?") + args = [room_id] - query = select_query + where - query_args = ["join", user_id, room_id] + bounds = _get_token_bound(from_key, from_comp) + if to_key: + bounds = "%s AND %s" % (bounds, _get_token_bound(to_key, to_comp)) - (query, query_args) = self._append_stream_operations( - "messages", query, query_args, from_pkey, to_pkey, - limit=limit, group_by=" GROUP BY messages.id " + if int(limit) > 0: + args.append(int(limit)) + limit_str = " LIMIT ?" + else: + limit_str = "" + + sql = ( + "SELECT * FROM events " + "WHERE room_id = ? AND %(bounds)s " + "ORDER BY topological_ordering %(order)s, stream_ordering %(order)s %(limit)s " + ) % {"bounds": bounds, "order": order, "limit": limit_str} + + rows = yield self._execute_and_decode( + sql, + *args ) - cursor = txn.execute(query, query_args) - - # convert the result set into events - entries = self.cursor_to_dict(cursor) - events = [] - for entry in entries: - # TODO we should spec the cursor > event mapping somewhere else. - event = {} - straight_mappings = ["msg_id", "user_id", "room_id"] - for key in straight_mappings: - event[key] = entry[key] - event["content"] = json.loads(entry["content"]) - if entry["compressed_fb"]: - event["feedback"] = json.loads(entry["compressed_fb"]) - events.append(event) - - latest_pkey = from_pkey if len(entries) == 0 else entries[-1]["id"] - - return (events, latest_pkey) - - def get_room_member_stream(self, user_id, from_key, to_key): - """Get all room membership events for this user between the given keys. - - Args: - user_id (str): The user who is requesting membership events. - from_key (int): The ID to start returning results from (exclusive). - to_key (int): The ID to stop returning results (exclusive). - Returns: - A tuple of rows (list of namedtuples), new_id(int) - """ - return self._db_pool.runInteraction( - self._get_room_member_rows, user_id, from_key, to_key + if rows: + topo = rows[-1]["topological_ordering"] + toke = rows[-1]["stream_ordering"] + next_token = "t%s-%s" % (topo, toke) + else: + # TODO (erikj): We should work out what to do here instead. + next_token = to_key if to_key else from_key + + defer.returnValue( + ( + [self._parse_event_from_row(r) for r in rows], + next_token + ) ) - def _get_room_member_rows(self, txn, user_id, from_pkey, to_pkey): - # get all room membership events for rooms which the user is - # *currently* joined in on, or all invite events for this user. - current_membership_sub_query = ( - "(SELECT membership FROM room_memberships" - + " WHERE user_id=? AND room_id = rm.room_id" - + " ORDER BY id DESC LIMIT 1)") - - query = ("SELECT rm.* FROM room_memberships rm " - # all membership events for rooms you've currently joined. - + " WHERE (? IN " + current_membership_sub_query - # all invite membership events for this user - + " OR rm.membership=? AND user_id=?)" - + " AND rm.id > ?") - query_args = ["join", user_id, "invite", user_id, from_pkey] - - if to_pkey != -1: - query += " AND rm.id < ?" - query_args.append(to_pkey) - - cursor = txn.execute(query, query_args) - return self._as_events(cursor, RoomMemberTable, from_pkey) - - def get_feedback_stream(self, user_id, from_key, to_key, room_id, limit=0): - return self._db_pool.runInteraction( - self._get_feedback_rows, - user_id, from_key, to_key, room_id, limit + @defer.inlineCallbacks + def get_recent_events_for_room(self, room_id, limit, with_feedback=False): + # TODO (erikj): Handle compressed feedback + + end_token = yield self.get_room_events_max_id() + + sql = ( + "SELECT * FROM events " + "WHERE room_id = ? AND stream_ordering <= ? " + "ORDER BY topological_ordering, stream_ordering DESC LIMIT ? " ) - def _get_feedback_rows(self, txn, user_id, from_pkey, to_pkey, room_id, - limit): - # work out which rooms this user is joined in on and join them with - # the room id on the feedback table, bounded by the specified pkeys - - # get all messages where the *current* membership state is 'join' for - # this user in that room. - query = ( - "SELECT feedback.* FROM feedback WHERE ? IN " - + "(SELECT membership from room_memberships WHERE user_id=?" - + " AND room_id = feedback.room_id ORDER BY id DESC LIMIT 1)") - query_args = ["join", user_id] - - if room_id: - query += " AND feedback.room_id=?" - query_args.append(room_id) - - (query, query_args) = self._append_stream_operations( - "feedback", query, query_args, from_pkey, to_pkey, limit=limit + rows = yield self._execute_and_decode( + sql, + room_id, end_token, limit ) - cursor = txn.execute(query, query_args) - return self._as_events(cursor, FeedbackTable, from_pkey) + rows.reverse() # As we selected with reverse ordering - def get_room_data_stream(self, user_id, from_key, to_key, room_id, - limit=0): - return self._db_pool.runInteraction( - self._get_room_data_rows, - user_id, from_key, to_key, room_id, limit + if rows: + topo = rows[0]["topological_ordering"] + toke = rows[0]["stream_ordering"] + start_token = "p%s-%s" % (topo, toke) + + token = (start_token, end_token) + else: + token = (end_token, end_token) + + defer.returnValue( + ( + [self._parse_event_from_row(r) for r in rows], + token + ) ) - def _get_room_data_rows(self, txn, user_id, from_pkey, to_pkey, room_id, - limit): - # work out which rooms this user is joined in on and join them with - # the room id on the feedback table, bounded by the specified pkeys - - # get all messages where the *current* membership state is 'join' for - # this user in that room. - query = ( - "SELECT room_data.* FROM room_data WHERE ? IN " - + "(SELECT membership from room_memberships WHERE user_id=?" - + " AND room_id = room_data.room_id ORDER BY id DESC LIMIT 1)") - query_args = ["join", user_id] - - if room_id: - query += " AND room_data.room_id=?" - query_args.append(room_id) - - (query, query_args) = self._append_stream_operations( - "room_data", query, query_args, from_pkey, to_pkey, limit=limit + @defer.inlineCallbacks + def get_room_events_max_id(self): + res = yield self._execute_and_decode( + "SELECT MAX(stream_ordering) as m FROM events" ) - cursor = txn.execute(query, query_args) - return self._as_events(cursor, RoomDataTable, from_pkey) - - def _append_stream_operations(self, table_name, query, query_args, - from_pkey, to_pkey, limit=None, - group_by=""): - LATEST_ROW = -1 - order_by = "" - if to_pkey > from_pkey: - if from_pkey != LATEST_ROW: - # e.g. from=5 to=9 >> from 5 to 9 >> id>5 AND id<9 - query += (" AND %s.id > ? AND %s.id < ?" % - (table_name, table_name)) - query_args.append(from_pkey) - query_args.append(to_pkey) - else: - # e.g. from=-1 to=5 >> from now to 5 >> id>5 ORDER BY id DESC - query += " AND %s.id > ? " % table_name - order_by = "ORDER BY id DESC" - query_args.append(to_pkey) - elif from_pkey > to_pkey: - if to_pkey != LATEST_ROW: - # from=9 to=5 >> from 9 to 5 >> id>5 AND id<9 ORDER BY id DESC - query += (" AND %s.id > ? AND %s.id < ? " % - (table_name, table_name)) - order_by = "ORDER BY id DESC" - query_args.append(to_pkey) - query_args.append(from_pkey) - else: - # from=5 to=-1 >> from 5 to now >> id>5 - query += " AND %s.id > ?" % table_name - query_args.append(from_pkey) - - query += group_by + order_by - - if limit and limit > 0: - query += " LIMIT ?" - query_args.append(str(limit)) - - return (query, query_args) - - def _as_events(self, cursor, table, from_pkey): - data_entries = table.decode_results(cursor) - last_pkey = from_pkey - if data_entries: - last_pkey = data_entries[-1].id - - events = [ - entry.as_event(self.event_factory).get_dict() - for entry in data_entries - ] - - return (events, last_pkey) + logger.debug("get_room_events_max_id: %s", res) + + if not res or not res[0] or not res[0]["m"]: + defer.returnValue("s1") + return + + key = res[0]["m"] + 1 + defer.returnValue("s%d" % (key,)) |