Can openfire supports mobility XMPP protocols (XEP-198)?

  1.  I am not able to manage continues connection using asmack . When asmack client lost the connection because of network problems or wifi connectivity issues it also display online in openfire console  for sometimes.In this time duration if some one message to this user message is lost. Message is not deliver to this user and also Message is not stored in ofOffine message store.
    
  2. I read about XEP-0198. Using this we can overcome this problem. I read many discussion regarding XEP-0198 in ignite realtime community. But there is not any movement regarding issues OF-446 and SMACK-333 .

  3. Is there any news regarding XEP-0198 ???

After 6 months of developing XEP-198 for Smack it’s nearly finished and support for Stream Management will be included in the next release: Smack 4.1 (although disabled per default). I’m not aware of anyone persuing XEP-198 support for Openfire and releasing it open source. I’ve heard that there is a closed-source XEP-198 patch for Openfire. But I can’t comment on the code quality, stability or usefulnes of that patch.

If no one is willing to contribute XEP-198 support (for free), then you either have to pay a developer to implement it, or use a XMPP server that does support XEP-198.

How much time require to develope XEP-0198 ?

Othere than openfire developer can try to develope this ?

If i dont know any thing about openfire source code than can i understand openfire project and try to develope XEP-0198 ??

Can we integrate XEP-0198 through plugin ?

How much time require to develope XEP-0198 ?

I’d say if you want good quality code with xep198 clustering support, then it would take a reasonable skilled developer with certain knowledge of xep198 and openfire at least a month to implement it and likely another month of bug hunting and fixing, ideally together with a community review of the code.

But if you are into software development, you know that such estimates are comperable to blindly throwing darts on possible answers to this question.

Othere than openfire developer can try to develope this ?
Everyone is free to try and make it’s own experiments.

If i dont know any thing about openfire source code than can i understand openfire project and try to develope XEP-0198 ??

Of course you can try, if it succeeds depends on your skill level and how much time you are willing to spent at it.

Can we integrate XEP-0198 through plugin ?
Unlikely with the current way plugins work, but may not impossible. OTOH experience shows that xep198 code works best as part of core functionality.

Is openfire bussiness edition support this feature ??

Is openfire bussiness edition support this feature ??

There is no such thing as a bussiness edition for openfire.

So why XEP-0198 souce code is closed source ??

Don’t know, you have to ask the person who has written it. If it really exists, i’ve just heard of it from someone who has heard of it (and so on). My guess is that they would want money for their work on this.

Hi All,

i just want to share with you how i’ve fixed this problem. To be clear i’ve not implemented the XEP-0198 standard but i developed a trick that works fine for me.

Here what i did.

I’ve developed a packet interceptor, when a packet of my interest reach the openfire server this is what i do:

  1. first of all i check if the user is online, if is not online i let the openfire to store messages on its offline table

  2. if the user is online i do this

var pencoded=java.net.URLEncoder.encode(packet.toXML(), “UTF-8”);

var millis=org.jivesoftware.util.StringUtils.dateToMillis(new java.util.Date());

var pid=java.net.URLEncoder.encode(packet.getID().toString(), “UTF-8”);

sendMessageToQueue(pencoded,millis,pid);

if (packet.getFrom()!=null && packet.getID()!=null){

var iq = new org.xmpp.packet.IQ();

iq.setType(org.xmpp.packet.IQ.Type.result);

iq.setChildElement(“ack”,“mynamespace”).addText(packet.getID());

iq.setTo(packet.getFrom());

log.info(“myFrom:”+packet.getFrom().toString());

var iqRouter = server.getIQRouter();

iqRouter.route(iq);

}

The first thing this code does is to store the message in a database table called MessageQueue, after this send an iq of type set to the client with the messageID. Of course the client knows how to handle this, and know that the message reached the server.

At the server side (to handle the fact that a client can lose the connection but for a while is seen ONLINE by the server), i do this:

if (!incoming && !processed){

if (packet.toString().indexOf(“my tag”)>0 && packet.getType()==“set” )

{

var id=packet.getChildElement().getText();

log.info(“AckManagementRemoveQueue Log Start”);

log.info("packet: " + packet);

log.info("id: " + id);

deleteMessageFromQueue(id);

log.info(“AckManagementRemoveQueue Log End”)

}

}

}

i check if a packet is of type SET and if contains a tag sent by the client, if yes the server remove the message from its own queue. If this ack never arrives in 1 minutes i simply put the message in the openfire offline table (with the format openfire can handle); in this way when a client reconnect the message is sent to him.

If anyone is interested to know more, just drop me a message and i will be more than welcome to help.

I know is not the best way but works like a charm for me and overcome a big problem!.

I like your solution :slight_smile:

Is it possible to put it in an openfire plugin for community use. I know it is not standard, but any solution is better than none.

If anyone free would like to implement xep 0198, here is the open source implementation for Tigase. It should be straight forward to incorporate this into Openfire

/* * StreamManagementIOProcessor.java * * Tigase Jabber/XMPP Server * Copyright (C) 2004-2013 "Tigase, Inc." <office@tigase.com> * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, * or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. Look for COPYING file in the top folder. * If not, see http://www.gnu.org/licenses/. * */
package tigase.server.xmppclient; import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import tigase.net.IOServiceListener;
import tigase.net.SocketThread;
import tigase.server.Command;
import tigase.server.ConnectionManager;
import tigase.server.Packet;
import tigase.server.Presence;
import tigase.util.TimerTask;
import tigase.xml.Element;
import tigase.xmpp.Authorization;
import tigase.xmpp.BareJID;
import tigase.xmpp.JID;
import tigase.xmpp.PacketErrorTypeException;
import tigase.xmpp.StanzaType;
import tigase.xmpp.XMPPIOService; /** * Class implements XEP-0198 Stream Management * * @author andrzej */
public class StreamManagementIOProcessor implements XMPPIOProcessor {      private static final Logger log = Logger.getLogger(StreamManagementIOProcessor.class.getCanonicalName());
          public static final String XMLNS = "urn:xmpp:sm:3";      // used tag names
     private static final String ACK_NAME = "a";
     private static final String ENABLE_NAME = "enable";
     private static final String ENABLED_NAME = "enabled";
     private static final String REQ_NAME = "r";
     private static final String RESUME_NAME = "resume";
     private static final String RESUMED_NAME = "resumed";
          // used attribute names
     private static final String H_ATTR = "h";
     private static final String LOCATION_ATTR = "location";
     private static final String RESUME_ATTR = "resume";
     private static final String MAX_ATTR = "max";
     private static final String PREVID_ATTR = "previd";
               // various strings used as key to store data in maps
     private static final String INGORE_UNDELIVERED_PRESENCE_KEY = "ignore-undelivered-presence";
     private static final String IN_COUNTER_KEY = XMLNS + "_in";
     private static final String MAX_RESUMPTION_TIMEOUT_KEY = XMLNS + "_resumption-timeout";
     private static final String OUT_COUNTER_KEY = XMLNS + "_out";
     private static final String RESUMPTION_TASK_KEY = XMLNS + "_resumption-task";
     private static final String RESUMPTION_TIMEOUT_PROP_KEY = "resumption-timeout";
     private static final String RESUMPTION_TIMEOUT_START_KEY = "resumption-timeout-start";
     private static final String STREAM_ID_KEY = XMLNS + "_stream_id";
          private static final Element[] FEATURES = { new Element("sm", new String[] { "xmlns" },
               new String[] { XMLNS }) };
          private final ConcurrentHashMap<String,XMPPIOService> services = new ConcurrentHashMap<String,XMPPIOService>();
          private boolean ignoreUndeliveredPresence = true;
     private int resumption_timeout = 60;
     private int default_ack_request_count = 10;
          private ConnectionManager connectionManager;
                    /**
      * Method returns true if XMPPIOService has enabled SM.
      *       * @param service
      *       */
     public static boolean isEnabled(XMPPIOService service) {
          return service.getSessionData().containsKey(IN_COUNTER_KEY);
     }
          private static boolean isResumptionEnabled(XMPPIOService service) {
          return service.getSessionData().containsKey(STREAM_ID_KEY);
     }
          public StreamManagementIOProcessor() {
     }      @Override
     public String getId() {
          return XMLNS;
     }
          @Override
     public void setConnectionManager(ConnectionManager connectionManager) {
          this.connectionManager = connectionManager;
     }               @Override
     public Element[] supStreamFeatures(XMPPIOService service) {
          // user jid may not be set yet because is is set during resource binding
          // while this feature should be advertised just after authentication of           // connection
          /*if (service.getUserJid() == null)
               return null;*/
                    return FEATURES;
     }
          @Override
     public boolean processIncoming(XMPPIOService service, Packet packet) {                    if (!isEnabled(service)) {
               if (packet.getXMLNS() != XMLNS) {
                    return false;
               }
               else if (packet.getElemName() == ENABLE_NAME) {
                    service.getSessionData().putIfAbsent(IN_COUNTER_KEY, new Counter());
                    service.getSessionData().putIfAbsent(OUT_COUNTER_KEY, new OutQueue());
                                                            String id = null;
                    String location = null;
                    int max = resumption_timeout;                     if (resumption_timeout > 0 && packet.getElement().getAttributeStaticStr(RESUME_ATTR) != null) {
                         String maxStr = packet.getElement().getAttributeStaticStr(MAX_ATTR);
                         if (maxStr != null) {
                              max = Math.min(max, Integer.parseInt(maxStr));
                         }
                         id = UUID.randomUUID().toString();
                         location = connectionManager.getDefHostName().toString();
                         service.getSessionData().putIfAbsent(STREAM_ID_KEY, id);
                         service.getSessionData().put(MAX_RESUMPTION_TIMEOUT_KEY, max);
                                                  services.put(id, service);
                    }
                    try {
                         service.writeRawData("<" + ENABLED_NAME + " xmlns='" + XMLNS + "'"
                                   + ( id != null ? " id='" + id + "' " + RESUME_ATTR + "='true' "+ MAX_ATTR + "='" + max + "'" : "" )                                    + ( location != null ? " " + LOCATION_ATTR + "='" + location + "'" : "" ) + " />");
                    }
                    catch (IOException ex) {
                         if (log.isLoggable(Level.FINE)) {
                              log.log(Level.FINE, "exception during sending <enabled/>, stopping...", ex);
                         }
                         service.forceStop();
                    }
                    return true;
               }
               else if (packet.getElemName() == RESUME_NAME) {
                    String h = packet.getElement().getAttributeStaticStr(H_ATTR);
                    String id = packet.getElement().getAttributeStaticStr(PREVID_ATTR);
                                        try {
                         resumeStream(service, id, Integer.parseInt(h));
                    } catch (IOException ex) {
                         if (log.isLoggable(Level.FINE)) {
                              log.log(Level.FINE, "exception while resuming stream for user "                                         + service.getUserJid() + " with id " + id, ex);
                         }
                                                  service.forceStop();
                    }
                    return true;
               }
               else {
                    return false;
               }
          }
          if (packet.getXMLNS() == XMLNS) {
               if (packet.getElemName() == ACK_NAME) {
                    String valStr = packet.getAttributeStaticStr(H_ATTR);
                                        int val = Integer.parseInt(valStr);
                    OutQueue outQueue = (OutQueue) service.getSessionData().get(OUT_COUNTER_KEY);
                    outQueue.ack(val);
               }
               else if (packet.getElemName() == REQ_NAME) {
                    int value = ((Counter) service.getSessionData().get(IN_COUNTER_KEY)).get();
                                        try {
                         service.writeRawData("<" + ACK_NAME + " xmlns='" + XMLNS                                    + "' " + H_ATTR + "='" + String.valueOf(value) + "'/>");
                    }
                    catch (IOException ex) {
                         if (log.isLoggable(Level.FINE)) {
                              log.log(Level.FINE, "exception during sending <a/> as "
                                        + "response for <r/>, stopping...", ex);
                         }
                         service.forceStop();
                    }
               }
               return true;
          }
                    ((Counter) service.getSessionData().get(IN_COUNTER_KEY)).inc();
                    return false;
     }      @Override
     public boolean processOutgoing(XMPPIOService service, Packet packet) {
          if (!isEnabled(service) || packet.getXMLNS() == XMLNS) {
               return false;
          }
                    OutQueue outQueue = (OutQueue) service.getSessionData().get(OUT_COUNTER_KEY);                    outQueue.append(packet);
                    return service.getSessionData().containsKey(RESUMPTION_TASK_KEY);
     }
          @Override
     public void packetsSent(XMPPIOService service) throws IOException {
          if (!isEnabled(service))
               return;
                    OutQueue outQueue = (OutQueue) service.getSessionData().get(OUT_COUNTER_KEY);                    if (outQueue != null && outQueue.waitingForAck() >= default_ack_request_count) {
               service.writeRawData("<" + REQ_NAME + " xmlns='" + XMLNS + "' />");
          }
     }
          @Override
     public void processCommand(XMPPIOService service, Packet pc) {
          String cmdId = Command.getFieldValue(pc, "cmd");
          if ("stream-moved".equals(cmdId)) {
               String newConn = Command.getFieldValue(pc, "new-conn-jid");
                              String id = (String) service.getSessionData().get(STREAM_ID_KEY);
                                             JID newConnJid = JID.jidInstanceNS(newConn);
               XMPPIOService newService = connectionManager.getXMPPIOService(newConnJid.getResource());
                              // if connection was closed during resumption, then close
               // old connection as it would not be able to resume                if (newService != null) {
                    if (log.isLoggable(Level.FINEST)) {
                         log.log(Level.FINEST, "stream for user {2} moved from {0} to {1}", new Object[] {                               service.getConnectionId(), newService.getConnectionId(), newService.getUserJid() });
                    }
                    try {
                         newService.setUserJid(service.getUserJid());
                         Counter inCounter = (Counter) newService.getSessionData().get(IN_COUNTER_KEY);
                         newService.writeRawData("<" + RESUMED_NAME + " xmlns='" + XMLNS + "' " + PREVID_ATTR + "='"                                    + id + "' " + H_ATTR + "='" + inCounter.get() + "' />");                          service.getSessionData().put("stream-closed", "stream-closed");
                         services.put(id, newService);
                                                  // resending packets thru new connection
                         OutQueue outQueue = (OutQueue) newService.getSessionData().get(OUT_COUNTER_KEY);
                         List<Packet> packetsToResend = new ArrayList<Packet>(outQueue.getQueue());
                         for (Packet packet : packetsToResend) {
                              newService.addPacketToSend(packet);
                         }
                                                                           // if there is any packet waiting we need to write them to socket
                         // and to do that we need to call processWaitingPackets();
                         if (!packetsToResend.isEmpty()) {
                              if (newService.writeInProgress.tryLock()) {
                                   try {
                                        newService.processWaitingPackets();
                                        SocketThread.addSocketService(newService);
                                   } catch (Exception e) {
                                        log.log(Level.WARNING, newService + "Exception during writing packets: ", e);
                                        try {
                                             newService.stop();
                                        } catch (Exception e1) {
                                             log.log(Level.WARNING, newService + "Exception stopping XMPPIOService: ", e1);
                                        }    // end of try-catch
                                   } finally {
                                        newService.writeInProgress.unlock();
                                   }
                              }
                         }
                    }
                    catch (IOException ex) {
                         if (log.isLoggable(Level.FINEST)) {
                              log.log(Level.FINEST, "could not confirm session resumption for user = "                                         + newService.getUserJid(), ex);
                         }
                                                  // remove new connection if resumption failed
                         services.remove(id, service);
                         services.remove(id, newService);
                    }
               }                    else {
                    if (log.isLoggable(Level.FINEST)) {
                         log.log(Level.FINEST, "no new service available for user {0} to resume from {1},"
                                   + " already closed?", new Object[] { service.getUserJid(), service });
                    }
               }
                              if (log.isLoggable(Level.FINEST)) {
                    log.log(Level.FINEST, "closing old service {0} for user {1}", new Object[] { service,                          service.getUserJid()});
               }
                              // stopping old service
               connectionManager.serviceStopped(service);
          }
     }
          @Override
     public boolean serviceStopped(XMPPIOService service, boolean streamClosed) {
          if (!isEnabled(service)) {
               if (log.isLoggable(Level.FINEST)) {
                    log.log(Level.FINEST, "{0}, service stopped - StreamManagement disabled", new Object[] { service });
               }
               return false;
          }           String id = (String) service.getSessionData().get(STREAM_ID_KEY);
                    if (streamClosed) {
               service.getSessionData().remove(STREAM_ID_KEY);
          }
                    //          try {
//               throw new Exception();
//          } catch (Throwable ex) {
//               log.log(Level.WARNING, "resumption timeout started, stream close = " + streamClosed, ex);
//               ex.printStackTrace();
//          }
          Long resumptionTimeoutStart = (Long) service.getSessionData().get(RESUMPTION_TIMEOUT_START_KEY);
          if (resumptionTimeoutStart != null) {
               if (log.isLoggable(Level.FINEST)) {
                    log.log(Level.FINEST, "{0}, service stopped - checking resumption timeout", new Object[] { service });
               }
               // if resumptionTimeoutStart is set let's check if resumption was                // not started for longer time than twice value of resumption_timeout
               if ((System.currentTimeMillis() - resumptionTimeoutStart) > (2 * resumption_timeout * 1000)) {
                    // if so we should assume that resumption failed so we should                     // send errors, remove reference to service and stop this service
                    services.remove(id, service);
                    service.clearWaitingPackets();
                    connectionManager.serviceStopped(service);
                    sendErrorsForQueuedPackets(service);                    }
               return false;
          }
                              // some buggy client (ie. Psi) may close stream without sending stream           // close which forces us to thread this stream as broken and waiting for           // resumption but those clients are not compatible with XEP-0198 and           // resumption so this should not happen
          if (isResumptionEnabled(service)) {
               if (!services.containsKey(id)) {
                    if (log.isLoggable(Level.FINEST)) {
                         log.log(Level.FINEST, "{0}, service stopped - resumption enabled but service not available", new Object[] { service });
                    }
                    return false;
               }                // ConnectionManager must not be notified about closed connection
               // but connection needs to be closed so this is this case we still                // return false to call forceStop but we remove IOServiceListener
               service.setIOServiceListener((IOServiceListener) null);
                                   int resumptionTimeout = (Integer) service.getSessionData().get(MAX_RESUMPTION_TIMEOUT_KEY);
               synchronized (service) {
                    if (!service.getSessionData().containsKey(RESUMPTION_TASK_KEY)) {                                                  TimerTask timerTask = new ResumptionTimeoutTask(service);
                         service.getSessionData().put(RESUMPTION_TASK_KEY, timerTask);
                         connectionManager.addTimerTask(timerTask, resumptionTimeout * 1000);
                                                  // set timestamp of begining of resumption to be able to detect
                         // if something went wrong during resumption and service is
                         // still kept in connection manager services as active service
                         // after twice as long as resumption timeout
                         service.getSessionData().put(RESUMPTION_TIMEOUT_START_KEY, System.currentTimeMillis());
                         service.clearWaitingPackets();
                         if (log.isLoggable(Level.FINEST)) {
                              log.log(Level.FINEST, "{0}, service stopped - resumption enabled and timeout started", new Object[] { service });
                         }
                    }
               }
                              return false;
          }
          else if (id != null) {
               services.remove(id, service);
          }           if (log.isLoggable(Level.FINEST)) {
               log.log(Level.FINEST, "{0}, service stopped - resumption disabled, sending unacked packets", new Object[] { service });
          }                              service.clearWaitingPackets();
          connectionManager.serviceStopped(service);
          sendErrorsForQueuedPackets(service);
          return false;
     }      @Override
     public void setProperties(Map<String,Object> props) {
          if (props.containsKey(RESUMPTION_TIMEOUT_PROP_KEY)) {
               this.resumption_timeout = (Integer) props.get(RESUMPTION_TIMEOUT_PROP_KEY);
          }
          if (props.containsKey(INGORE_UNDELIVERED_PRESENCE_KEY)) {
               this.ignoreUndeliveredPresence = (Boolean) props.get(INGORE_UNDELIVERED_PRESENCE_KEY);
          }
     }
          /**
      * Method responsible for starting process of stream resumption
      *       * @param service
      * @param id
      * @param h
      * @throws IOException       */
     private void resumeStream(XMPPIOService service, String id, int h) throws IOException {
          XMPPIOService oldService = services.get(id);
          if (oldService == null || !isSameUser(oldService, service)) {
               // should send failed!
               service.writeRawData("<failed xmlns='" + XMLNS + "'>"                          + "<item-not-found xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/>"
                         + "</failed>");                                   return;               }           // if stream has resource binded then we should not resume
          if (service.getUserJid() != null && JID.jidInstanceNS(service.getUserJid()).getResource() != null) {
               service.writeRawData("<failed xmlns='" + XMLNS + "'>"
                         + "<unexpected-request xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/>"
                         + "</failed>");
               return;
          }
                    if (services.remove(id, oldService)) {
               synchronized (oldService) {
                    TimerTask timerTask = (TimerTask) oldService.getSessionData().remove(RESUMPTION_TASK_KEY);
                    if (timerTask != null) {
                         timerTask.cancel();
                    }
                    oldService.getSessionData().put(RESUMPTION_TASK_KEY, true);
                    oldService.clearWaitingPackets();
               }                // get old out queue
               OutQueue outQueue = (OutQueue) oldService.getSessionData().get(OUT_COUNTER_KEY);
               outQueue.ack(h);                // move required data from old XMPPIOService session data to new service session data
               service.getSessionData().put(OUT_COUNTER_KEY, outQueue);
               service.getSessionData().put(MAX_RESUMPTION_TIMEOUT_KEY,                          oldService.getSessionData().get(MAX_RESUMPTION_TIMEOUT_KEY));
               service.getSessionData().put(IN_COUNTER_KEY,                          oldService.getSessionData().get(IN_COUNTER_KEY));
               service.getSessionData().put(STREAM_ID_KEY,                          oldService.getSessionData().get(STREAM_ID_KEY));
                              // send notification to session manager about change of connection                // used for session
               Packet cmd = Command.STREAM_MOVED.getPacket(service.getConnectionId(),                          service.getDataReceiver(), StanzaType.set, "moved");
               cmd.setPacketFrom(service.getConnectionId());
               cmd.setPacketTo(service.getDataReceiver());
               Command.addFieldValue(cmd, "old-conn-jid", oldService.getConnectionId().toString());
               connectionManager.processOutPacket(cmd);
          }
          else {
               // should send failed!
               service.writeRawData("<failed xmlns='" + XMLNS + "'>"                          + "<item-not-found xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/>"
                         + "</failed>");
          }
     }
          /**
      * Verifies if connections are authenticate for same bare jid
      *       * @param oldService
      * @param newService
      * @return true - only when bare jids are the same
      */
     private boolean isSameUser(XMPPIOService oldService, XMPPIOService newService) {
          if (oldService.getUserJid() == null || newService.getUserJid() == null)
               return false;
                    JID oldUserJid = JID.jidInstanceNS(oldService.getUserJid());
          JID newUserJid = JID.jidInstanceNS(newService.getUserJid());
                    return oldUserJid.getBareJID().equals(newUserJid.getBareJID());
     }
          /**
      * Method responsible for sending recipient-unavailable error for all not acked packets
      *       * @param service       */
     private void sendErrorsForQueuedPackets(XMPPIOService service) {
          service.clearWaitingPackets();
                    OutQueue outQueue = (OutQueue) service.getSessionData().remove(OUT_COUNTER_KEY);                    if (outQueue != null) {
               Packet packet = null;
                              while ((packet = outQueue.queue.poll()) != null) {                                        connectionManager.processUndeliveredPacket(packet, null);
               }
          }
     }
          /**
      * ResumptionTimeoutTask class is used for handing of timeout used during       * session resumption
      */
     private class ResumptionTimeoutTask extends TimerTask {           private final XMPPIOService service;
                    public ResumptionTimeoutTask(XMPPIOService service) {
               this.service = service;
          }
                    @Override
          public void run() {                    String id = (String) service.getSessionData().get(STREAM_ID_KEY);                              if (services.remove(id, service)) {
                    //service.getSessionData().put(SERVICE_STOP_ALLOWED_KEY, true);
                    connectionManager.serviceStopped(service);
                    sendErrorsForQueuedPackets(service);
               }
          }
               }
          /**
      * Counter class implements proper counter with overflow from 2^32-1 to 0
      */
     private static class Counter {
                    private int counter = 0;
                    /**
           * Increment counter
           */
          public void inc() {
               counter++;
               if (counter < 0)
                    counter = 0;
          }
                    /**
           * Get value of counter.
           *            *            */
          public int get() {
               return counter;
          }
                              /**
           * Sets value of a counter - use only for testing!
           *            * @param value            */
          protected void setCounter(int value) {
               this.counter = value;
          }
     }
          /**
      * OutQueue class implements queue of outgoing packets waiting for ack
      * with implementation of removing acked elements when id of acked packet       * is passed
      */
     public static class OutQueue extends Counter {
                    private final ArrayDeque<Packet> queue = new ArrayDeque<Packet>();
                    /**
           * Append packet to waiting for ack queue
           *            * @param packet            */
          public void append(Packet packet) {
               if (!packet.wasProcessedBy(XMLNS)) {
                    packet.processedBy(XMLNS);
                    queue.offer(packet);
                    inc();
               }
          }
                    /**
           * Confirm delivery of packets up to count passed as value
           *            * @param value            */
          public void ack(int value) {                              int count = get() - value;
                              if (count < 0) {
                    count = (Integer.MAX_VALUE - value) + get() + 1;
               }
                              while (count < queue.size()) {
                    queue.poll();
               }
          }
                    /**
           * Returns size of queue containing packets waiting for ack
           *            *            */
          public int waitingForAck() {
               return queue.size();
          }
                    /**
           * Method returns internal queue with packets waiting for ack - use testing            * only!
           *            *            */
          protected ArrayDeque<Packet> getQueue() {
               return queue;
          }
               }
}

Forgive my ignorance, but why not use BOSH? It seems to all intents and purposes to offer a sort of stream management with support for reattach and pausing of connections.

Personally don’t like BOSH, it may cause some overhead across the system; but i’m not so experienced on it; Maybe useful to share your with us all

Hello Stefano Galizia ,

** Heartily thanks for sharing this type of work in community …**

** I am interested to understand logic behind this code … How can i contact you for this ??? **

and also thanks Dele Olajide .

One of the problems that xep 0198 is solving is the handling of messages that get lost in between the disconnection and the detection on the other side. It requires some extra processing at both the server and client side to queue and resend the missing messages. With BOSH, this is even more critical as it takes longer to detect the disconnection The flip side of this is that resuming a stream with BOSH could be simpler.

Yep agree with you, that’s exactly what we have done both at server and client side.

Hi Stefano Galizia ,

I want to understand how you implement this acknowledgement feature which you are implemented. How can i contact you for this .

Thanks in advance.

Rajesh

Please contact me at sgalizia75@gmail.com