Introducing Hazelcast ... a new way to cluster Openfire!

I checked out the latest SVN changes to pubsub and and ran into an issue.

Even though XEP-0060 does say that when an item is published to a node, it MUST have a unique key, Openfire pubsub implementation in fact did check for duplicates and removed them. This was a very useful feature as it enables the use of Openfire pubsub as an application datastore with realtime replication. I use a backbone.js xmpp datastore that works well with Openfire.

The problem is in org.jivesoftware.openfire.pubsub.LeafNode.java

// Check and remove any existing items that have the matching ID,
    // generated ID's won't match since we already checked.
    PublishedItem duplicate = itemsByID.get(newItem.getID());     if (duplicate != null)
    {
                removeItem(findIndexById(duplicate.getID()));
    }

This code is gone and now data integrity exception errors occur because of the attempt to write a duplicate database record.

I appreciate the code is being optimized for clustering, but is there any chance of keeping backward compatibility by reusing node item ids. I do not wish to fork the code

Hi Dele -

I agree … we moved some of the persistence logic around during the refactor and it seems this important de-duplication step was dropped. We can definitely fold it back in - no need to fork. I will take a closer look and reply here shortly.

Thanks,

Tom

I will take a look as well, since I actually removed that.

What is supposed to happen when a duplicate ID occurs is that the original record gets overwritten, there should be no error since this is a valid usecase. This is how items get updated in pubsub.

The original code was changed since it relied on every persisted item in every node being in memory, hence the reason I logged a Jira task about this as a memory leak and refactored how it worked as a precursor to the clustering work. So the code in question was removed before clustering was actually started, since I changed how pubsub actually did its in memory caching and persistence.

Thanks :slight_smile:

I noticed the memory map was gone, but could not work out how to discover duplication without it. I’ll wait for you to fix it.

I have been running 3.7.2 alpha for a while for some of my projects, but because I needed CORS, I have been using Pubsub mostly in a custom build of 3.7.1 and had just returned to 3.7.2 trunk for Pubsub after the CORS fix went in.

The changes look very good and should be a performance boost for Openfire Pubsub. Excellent work

OK - committed 13303 to address this issue (hopefully). We basically just delete the duplicate item from the cache(s) and DB before writing the new one in its place. Please give it a whirl and let me know if you run into any other surprises.

Thanks Tom,

I did patch my copy of the code to do that and also swap the postion of the batch deletions and additions code.

I am inclined to to go with Rob’s suggestion to perform a direct DB update instead of a delete/add. It sounds more efficient. But then I am not a DB expert. As we are performing the SQL operations in batch, maybe it does not matter

The changes work fine Thank you

The overwrite I spoke of would be implemented exactly how Tom has done it, a delete and add in the same transaction. Doing an update would require a search first to find out if the item already existed anyway. Much simpler to just delete them all and then add them again as batch operations.

Looking at 13303 I wonder how good it is to use two transactions or the new line “itemsToDelete.addLast(item); // delete stored duplicate (if any)”. Now there are two places where items are added to the itemsToDelete list. This makes things much more complicated.

a) Items which were added using removePublishedItem() should log an error and rollback the transaction.

b) Items which were added by savePublishedItem() should not log an error and there is also no need to rollback the transaction.

My suggestion:

  1. remove the “itemsToDelete.addLast(item); // delete stored duplicate (if any)” line

  2. Leave the 1st trancation (delList) as it is. I assume that it is still needed. (?)

  3. Use the 2nd transaction to delete the items on the addList and then add the items. Do not log delete errors or rollback the transaction if deleting fails. If the insert fails then rollback everything to make sure that the old nodes do still exist. This is what most users expect when their update fails, the old value should still exist and not be deleted - hopefully this matches the pubsub spec.

PS: In case of exceptions I prefer to log the SQL statement and a reference of the affected item - this makes resolving errors much more easy. E.g. log.error(ADD_ITEM + " (" + item.getNode().getService().getServiceID() + ", " + … + ") " + sqle.getMessage(), sqle);

I don’t think there is really any issue with the adding of the published item to the itemsToDelete, but the delete and add should definitely be in the same transaction. This is the only way to quarentee no duplicate id problems during the add when operating in a cluster. This would also ensure that any existing data remains untouched, since the delete and add are now part of the same transaction, thus if either fails the database will remain untouched.

I am about to make some changes due to a different transaction related issue that will make fix as well (since the flush will be part of another operation).

As for what to do if there is an exception. I am not sure, logically, the only way this can fail is if we lose connection to the db or some other system related error, which makes the rollback a moot point. The only thing then is determining if the service itself should be shutdown due to it not being able to function properly anymore. Exceptions would of course be logged.

The fact is, if an exception occurs when we do a flush, we will lose any new items that have been added, up to the size of the cache. The only way to prevent this from happening is to set the cache size to 1, then the item will get persisted every time it is published. Even for that case we need to change the error handling during the flush so it can get reported back to the publisher.

Yes, a single transaction is certainly the way to go here - thanks for catching/fixing that. What I intended to do was setup two batch updates (one for the deletes, another for the adds) to keep the DB I/O reasonably efficient, but both should be in the same transaction context.

In the case of an exception and rollback, I suppose we could attempt to reinsert the falied items into the pending items caches (read and write), in addition to recording the error in the log. However, we would also need to maintain the original order of the items, possibly deal with multiple retries, cache overflows, etc. I’m not sure if that’s better or worse … opinions?

My XMPP/XEP pubsub knowledge is very limited as I did read only parts of the XEP. Anyhow deleting nodes is a completely different thing than adding nodes. So I would not mix these. Likely it’s not possible to try to delete an non-existing node, so there will never be an exception during deletion. But we all have seen unbelievable things which were compensated by the brain to avoid that we get mad. That’s why I would like to separate these two tasks.

@ “The fact is, if an exception occurs when we do a flush, we will lose any new items that have been added, up to the size of the cache.” I wonder whether this is in sync with the XEP. May be another reason not to mix deletes and updates. And for me it looks like one wants to set the write cache size to 1 (without yet notifying the user, while this would be nice). Actually Openfire should be able to recover from database exceptions and power losses without losing information.

So one does not need to care about reinserting failed items, multiple retries, … I thought of writing all inserts and deletes to the hard disc so one can re-execute them but keeping these things synchronized with the database within a cluster may be very hard. For me it seems that no write cache or a cache size of 1 is the much better approach.

Inserting 1000 rows in one transaction is usually faster than getting 1000 database connections for single row transactions - but maybe the performance impact is acceptable if one can simplify the code.

Anyhow deleting nodes is a completely different thing than adding nodes. So I would not mix these. Likely it’s not possible to try to delete an non-existing node, so there will never be an exception during deletion.
We need to straighten out the terminology here, we are talking about items not nodes (nodes are what the items are published to, the messaging queue if you will). Anyway, functionally speaking, they are obviously different, but that is not really the issue here. The code in question is really just a means of flushing the in memory cache to the persistent store, and in that case we are actually doing the same db operations. That being said, at the bottom you will see that I propose we change that to something inline with what you are suggesting.

@ “The fact is, if an exception occurs when we do a flush, we will lose any new items that have been added, up to the size of the cache.” I wonder whether this is in sync with the XEP.
This sort of thing isn’t covered in the spec as this is an implementation detail. Of course we don’t want to ever lose data, but I am not sure that can be accomplished with a simple in memory cache and generic database access as OF currently supports (I don’t claim to be an expert in this regard though). We can, of course, set the cache size to 0 as I said earlier, thus forcing db access on every publish and delete. This would probably work fine for many cases actually, and maybe should be the default. It just becomes a problem when we get into massive amounts of publishing as the IO will become a bottleneck. This would then put the onus on the system owner to configure the cache with the full knowledge that the boost in performance comes at a cost.

To be honest, when this refactoring occurred, the intent was to make it much more scalable then it was before by eliminating the memory leak and of course making it clusterable. With regards to the guarentee of 0 data loss, it wasn’t really taken into consideration. In that respect the new code happens to be better than the old (with 0 cache size) but the current defaults suffer the problem as the old version, which has the same issues with respect to data loss.

So, as I have been writing this reply, I have been thinking and this is what I propose.

  1. Have two separate cache size properties, one for inserts and a separate one for deletes. I propose this because I would suspect that if someone will want to use a cache to improve performance, the vast majority of use cases would only require it on the publish, not the delete. This also means of course that flushing would be separate operations for delete and add.
  2. We set the default cache size to 0, thus forcing a db call for every publish and delete, meaning 0 possibility of dataloss.
  3. Refactor the persistence calls for publish and delete to throw an exception to the caller (the node) so an appropriate error can be relayed back to the user on the failed request.
  4. Document the side effect of setting the cache size properties.

As a possible future enhancement, we could also allow the persistence manager to maintain its own pool of connections, either create them outright or “borrow” them from the db connection manager. I don’t think there are any other components in OF that have the same potentially heavy db access needs as pubsub may require. Thus it makes sense that it could have its own dedicated db connections so it won’t have to compete with other modules/components.

1 Like

Sorry for the item / node confusion, you are of course right.

Currently it’s hard to tell how this will impact the performance of the server and the database. Also the cluster talk does need some time. So there may be the need to have small local queues/caches or other asynchronous operations to avoid a bottleneck.

One could write a small test program to measure SQL performance for a one 1000-lines transaction and 1000 1-line transactions (without reads) - this should help to decide whether one can completely remove the cache or set the size to 0.

@“3. … so an appropriate error can be relayed back to the user on the failed request.” I don’t think that this is really possible. There is no error defined for such a failure and the user did likely already receive an OK message.

But there is also the flush timer, maybe it is fine to set it to one second. This would not emiminate the potential data loss but in most cases it is acceptable to lose the last second. Is this a reasonable alternative?

So we could also keep the current cache sizes.

The performance tests are a great idea. I actually have a small testing project I was using when I initially refactored pubsub. I was mainly aiming at the memory usage, but it will also work quite well for testing throughput as well.

Even without testing though, I can’t imagine that caching will make any difference in a use case where there are only a couple of items published per second, or less. Now when we get into 10s or 100s per second, that is a completely different story. In any case, some metrics would be valuable, I will have to see what I can do.

The error case should be easy enough, as the db call becomes part of the flow of processing the publish anyway, thus it’s failure would bubble up and can be reported as an **internal-server-error **to the publisher. The db call is already part of the processing chain if the item being published actually triggers a flush if it exceeds the cache limit.

Without the cache, the timer would not be required, so it would simply not be turned on. With a cache, having an extremely short timer would cause the same problem as having no cache, overly frequent db access. In a constantly busy system, the timer is actually not needed as we would be constantly flushing due to the size restriction. I think the only use case where the timer would be needed is when we have bursts of traffic, where the burst would tirgger flushing due to size, but when the burst is over and there are still items in the cache, then the timer would take care of persisting them (and any subsequent low volume publishing between bursts). This is assuming that no cache is the best option in a low volume system.

I have attached to this post a slighlty modified version of hazelcast that is backwards compatible with Openfire 3.6.4 and Openfire 3.7.1. I had to rename the plugin and some classes.

Unzip and copy the clustering.jar file to your plugins folder.
hazelcast-364-1.0.0.zip (1680594 Bytes)

I’ve just taken the plugin for a spin and it seems to be in good working condition. Tried session replication, presence changes, MUC, pubsub, all worked across nodes.

Now, the question is, is the version of the plugin that works with Openfire 3.7.1 available from an official source?

What qualifies as an official source?

It is the same source code. I just renamed the plugin and some classes to appease Opennfire 3.7.1

@@Dele Olajide ; @@Alex : Please tell me what did you do to make this plugin work. I installed it on my 2 Openfire servers and it doesn’t show other cluster members than current host. Thanks in advance !

The default cluster configuration uses multicast (UDP) to discover member nodes, which can be problematic in certain deployments. If you are unable to use multicast due to your network configuration, you can configure the unicast (TCP) settings as documented in the Hazelcast plugin’s readme file:


*The Hazelcast plugin uses the XML configuration builder to initialize the cluster from the XML configuration file (hazelcast-cache-config.xml). By default the cluster members will attempt to discover each other via multicast at the following location: *

  • IP Address: 224.2.2.3
  • Port: 54327
  • Note that these values can be overridden in the plugin’s /classes/hazelcast-cache-config.xml file (via the multicast-group and multicast-port elements). Many other initialization and discovery options exist, as documented in the Hazelcast configuration docs noted above. For example, to set up a two-node cluster using well-known DNS name/port values, try the following alternative: *
*...
<join>
   <multicast enabled="false"/>
   <tcp-ip enabled="true">
     <hostname>of-node-a.example.com:5701</hostname>
     <hostname>of-node-b.example.com:5701</hostname>
   </tcp-ip>
   <aws enabled="false"/>
</join>
...
*

Hope that helps!

2 Likes