2009-06-09

Previous: 2009-06-08

Next: 2009-06-12

created: 1245140600|%e %B %Y, %H:%M

The basics of the storage layer are now in place. I'll extend this by adding more classes to it, and more implementations. But first, let's start on the routing layer. My aim is to get an end-to-end demonstration working, with null (fanout) routing.

There has been discussion in the AMQP workgroup of exchanges, and their role. In Xump's design, there is an exchange but it is universal and invisible, and is in fact the Xump instance. While AMQP extends routing functionality by defining new exchange types, Xump extends routing functionality by defining new extensions to the single routing layer. We'll do that using a portal API as we did for the storage layer, but this will come later.

Xump uses a inverted database analogy for routing. The caller defines selectors on queues, and as data flows through a queue, the routing layer applies the selectors on that queue. This is conceptually simple but harder to make than to explain. In AMQP land, some routing entities (exchanges) have no storage, while others (shared queues) do. I don't like the distinction. It should depend on the selectors and the network they feed: if the queue can empty itself immediately, it does so. And if not, it builds up and empties when it can.

So, we are going to design selectors. I'd also thought of calling these triggers, which is a neat abuse of database jargon, but I think it would be inaccurate. Yes, they are triggered by message flow, but they perform a selection action.

Selectors specify matching and/or filtering relationships between queues. Matching and filtering are two different and complementary ways of deciding whether a message M should go to queue Q or not:

  • Matching is complex: every possible address A has a list of selectors that specified it. We thus match a message's address with its correspondong set of selectors, giving a "yes" decision for each of them.
  • Filtering is simple: a selector S inspects a message M and makes a yes/no decision about whether or not it wants that message.

The tradeoffs are:

  • Matching is a fast O(logn) operation but requires that the set of possible addresses is finite and changes slowly.
  • Filtering works on any aspect of the message, or external conditions, but is a slow O(n2) operation.

Typically, filtering is good for content-based routing, while matching is ideal for topic distribution where the topic tree is finite and slow to change (like stock indices, newsgroup trees, etc.) The ideal combination is to match first, and then filter on the much smaller set of matched messages.

When a new message arrives in a queue, we match it against all selectors, giving us an interim hit set. We then filter against each selector in the hit set, giving us a final hit set. Each selector specifies either COPY or MOVE. We do all the COPYs first, and then we do the first MOVE. When we do a MOVE, we push that selector to the end of the list of selectors, so that messages will be round-robined between all MOVE selectors, if there are several.

In order to move a message, a selector needs sufficient credit. It gets credit from the destination queue, which tells it, "ready to accept N more messages". There are two common cases: infinite credit, where messages are delivered without any response from the destination, and single-step, where messages are moved one at a time, with confirmation for each message from the destination.

So, if a queue has selectors but none have sufficient credit, messages will accumulate. When any of the selectors receive credit, they will start to move messages again.

We could use credit COPY as well but this has some unpleasant side effects. First, it causes source queues to fill up when destinations are slow in processing messages. Second, it forces us to treat selectors as independent cursors on the queue, which makes O(logn) routing impossible or at least very difficult. And it does not appear to be necessary for the use-cases we see: COPY is typically used for high-volume parrot (pub-sub) patterns where messages are never acknowledged, and where messages are pushed towards the destination as rapidly as possible, and dropped when that cannot be done, rather than backing-up source queues.

A selector specifies a source queue and a destination (presumably a queue). I'm not yet sure whether the destination is required, i.e. whether selectors are the terminators in the directed network, or whether queues are.

For our initial end-to-end proof, we can ignore filtering. The only natural identification for a selector is the selector itself, i.e. the concatenation of its main properties. This is not easy to work with. So, we'll number selectors per source queue and provide a way for applications to fetch selectors by index, exactly as we do for messages.

A selector therefore defines some or all of:

  • A numeric identifier.
  • A source queue.
  • A destination name.
  • A match type, which defines the selector algorithm. I'll list a few below.
  • A match argument, which is passed to the algorithm. Its meaning depends on the match type.
  • An operation, which is one of MOVE or COPY.
  • A credit, which is an integer number of messages. For "infinite" we'll use -1.

Selectors exist in the storage layer, attached to their source queue. It seems to make sense to hold these in the same storage layer instance as their source queue, since their lifespans are linked. If we delete a queue, we delete all selectors attached to it.

We need some mechanism to determine whether a name refers to queue, or not, and what storage layer instance that queue is in. The simplest is probably a queue name cache in the Xump engine.

For now, we'll define only two match types, "EQ", which does a literal string comparison between the message address and the argument, and the empty match type, which means "all messages". These correspond to the AMQP "direct" and "fanout" exchange types.

From the above, we need to make:

  • A queue name cache that holds the set of all known queues and that can interrogate storage layers to know whether a queue is defined or not.
  • A xump_selector class that lets us work with selector objects held in the storage layer. This would work similarly to xump_message.
  • New methods in the store API to handle selectors (create, fetch, delete).
  • Implementation of these methods in the RAM store.

This still won't let us do message routing, but we're getting closer.

Bookmark and Share

Rate this post:

rating: 0+x

Comments: 0