created: 1244484861|%e %B %Y, %H:%M
2009-06-08
I'm removing the message_update method, it does not make sense. We define a message fully when we create it, and all state needed to do routing should be held in the routing layer, not the storage layer.
The message_delete method needs to be idempotent, thus safe to call no matter what the state of the store. Thus, before we delete the ram message we need to check that the queue exists. We should be robust against stupid things like fetching a message, deleting the queue, then trying to delete the message. That means we do not need to worry about the order of a teardown. The cost is an extra lookup, eventually this can be optimised away.
Message properties (fields): we do not need to manipulate these as objects; they are created and used but never (or rarely) modified for existing messages. Thus, the fastest and simplest data structure will be an env-type block (name=value<nul>…)
To implement message properties, I'm going to call these "headers", to match the SMTP/HTTP semantics. Headers are name/value pairs, where both name and value are C strings (i.e. they cannot contain null characters). We need to be able to store the headers for a message when we create a new message, and we need to be able to access (but rarely modify) the headers for an existing message.
To implement this, we'll make a new xump_headers class that handles header fields. A xump_message will contain a xump_headers object (or at least opportunistically, since there is no point creating xump_header objects we don't need). Here is the test code for xump_headers:
xump_headers_t
*headers;
headers = xump_headers_new ();
xump_headers_set (headers, "a", "one");
xump_headers_set (headers, "b", "two");
xump_headers_set (headers, "c", "three");
xump_headers_set (headers, "a", "value");
assert (streq (xump_headers_get (headers, "a"), "value"));
assert (streq (xump_headers_get (headers, "z"), ""));
xump_headers_destroy (&headers);
I've used reference counting for the xump_headers class, so that for storage layers that keep the headers in memory, there is no copying. Fetching a message just fetches a reference to the headers. When the last reference is deleted, the headers are also destroyed.
We could eventually use the same mechanism to avoid copying message data, by holding it in buckets, which are also reference counted. The break-even point seems to be at about 2,300 bytes, where copying costs more than allocating a bucket using the fastest 'direct' allocator (the ALLOCATOR=direct runtime setting for Base2, which bypasses the memory debugging framework). So in the worst case scenario, we can use buckets for messages of 2.3K and above. But if we assume each message is read at least once, then the break-even point is lower. Using buckets will also eliminate duplicate message content for messages that are on more than one queue (at least, for RAM-based queues).
We'll come back to buckets another time, using them to reduce copying work for messages of 1K and above in size. The current API is pretty fast already. Creating and deleting 1M messages takes about 1 second if I switch to the direct allocator. When compiled in release mode, I expect this to go twice as fast.
2009-06-06
We identify messages using a numeric ID that is unique within a queue. It's the storage layer's responsibility to provide new IDs. We'll use an unsigned 32-bit ID and ignore rollover. If a single queue has more than 2^32 messages, the older ones won't be accessible. Fair enough.
Thinking: xump_message and xump_queue objects should have a void * context block which is managed by the storage layer. This requires that the constructors (at least) and destructors are called from within the storage layer. We can constrain destruction of the void * block as icl_mem_free.
2009-06-05
The API is breaking one of the rules of software design, it's confusing. I find myself looking at the code to figure out where I put the message creation code. This ain't good. So, I'm going to change this again, so that we create messages like we do queues, with a 'message_create' method. This is then a 'create' method on the xump_message class, which invokes the storage portal method 'message_create'.
So what we have is:
- xump_queue.icl: create, fetch, delete (no update possible on queues).
- xump_message.icl: create, fetch, update, delete.
- xump_store.icl: queue_create, queue_fetch, queue_delete, message_create, message_fetch, message_update, message_delete.
Sorry for the detour. Sometimes we need to try stuff in order to realize that it's silly.
2009-06-04
The storage layer must be entirely ignorant of how messages flow into queues, and out again. This is handled by the engine. What the storage layer does is store messages, provide access to them, and delete them. In our current design, the head of the queue is stable insofar as the only party removing messages is the engine itself.
Access to messages is thus synchronous, i.e. the engine polls the storage layer when it needs messages. This means that we can browse queues using an index that iterates from the head of the queue, i.e. 0 = oldest message, 1 = second oldest, etc. We need methods like this, as we previously sketched:
- message_fetch (message, index) - fetch message object by index
- message_update (message) - update message from object
- message_delete (message) - delete message
Perhaps the best identifier for a message is a queue-unique id, rather than the index. Thus the engine can manipulate messages independently. In storage layers where the real message is large (e.g. a disk file), the message object acts as a reference. In storage layers where the real message is small, the message object would hold the denormalized body data.
Rate this post:
