diff options
Diffstat (limited to 'src/org/jivesoftware/smackx/filetransfer/FaultTolerantNegotiator.java')
-rw-r--r-- | src/org/jivesoftware/smackx/filetransfer/FaultTolerantNegotiator.java | 186 |
1 files changed, 186 insertions, 0 deletions
diff --git a/src/org/jivesoftware/smackx/filetransfer/FaultTolerantNegotiator.java b/src/org/jivesoftware/smackx/filetransfer/FaultTolerantNegotiator.java new file mode 100644 index 0000000..22b5b1d --- /dev/null +++ b/src/org/jivesoftware/smackx/filetransfer/FaultTolerantNegotiator.java @@ -0,0 +1,186 @@ +/** + * $RCSfile$ + * $Revision$ + * $Date$ + * + * Copyright 2003-2006 Jive Software. + * + * All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jivesoftware.smackx.filetransfer; + +import org.jivesoftware.smack.PacketCollector; +import org.jivesoftware.smack.SmackConfiguration; +import org.jivesoftware.smack.Connection; +import org.jivesoftware.smack.XMPPException; +import org.jivesoftware.smack.filter.OrFilter; +import org.jivesoftware.smack.filter.PacketFilter; +import org.jivesoftware.smack.packet.Packet; +import org.jivesoftware.smackx.packet.StreamInitiation; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.*; +import java.util.List; +import java.util.ArrayList; + + +/** + * The fault tolerant negotiator takes two stream negotiators, the primary and the secondary + * negotiator. If the primary negotiator fails during the stream negotiaton process, the second + * negotiator is used. + */ +public class FaultTolerantNegotiator extends StreamNegotiator { + + private StreamNegotiator primaryNegotiator; + private StreamNegotiator secondaryNegotiator; + private Connection connection; + private PacketFilter primaryFilter; + private PacketFilter secondaryFilter; + + public FaultTolerantNegotiator(Connection connection, StreamNegotiator primary, + StreamNegotiator secondary) { + this.primaryNegotiator = primary; + this.secondaryNegotiator = secondary; + this.connection = connection; + } + + public PacketFilter getInitiationPacketFilter(String from, String streamID) { + if (primaryFilter == null || secondaryFilter == null) { + primaryFilter = primaryNegotiator.getInitiationPacketFilter(from, streamID); + secondaryFilter = secondaryNegotiator.getInitiationPacketFilter(from, streamID); + } + return new OrFilter(primaryFilter, secondaryFilter); + } + + InputStream negotiateIncomingStream(Packet streamInitiation) throws XMPPException { + throw new UnsupportedOperationException("Negotiation only handled by create incoming " + + "stream method."); + } + + final Packet initiateIncomingStream(Connection connection, StreamInitiation initiation) { + throw new UnsupportedOperationException("Initiation handled by createIncomingStream " + + "method"); + } + + public InputStream createIncomingStream(StreamInitiation initiation) throws XMPPException { + PacketCollector collector = connection.createPacketCollector( + getInitiationPacketFilter(initiation.getFrom(), initiation.getSessionID())); + + connection.sendPacket(super.createInitiationAccept(initiation, getNamespaces())); + + ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2); + CompletionService<InputStream> service + = new ExecutorCompletionService<InputStream>(threadPoolExecutor); + List<Future<InputStream>> futures = new ArrayList<Future<InputStream>>(); + InputStream stream = null; + XMPPException exception = null; + try { + futures.add(service.submit(new NegotiatorService(collector))); + futures.add(service.submit(new NegotiatorService(collector))); + + int i = 0; + while (stream == null && i < futures.size()) { + Future<InputStream> future; + try { + i++; + future = service.poll(10, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + continue; + } + + if (future == null) { + continue; + } + + try { + stream = future.get(); + } + catch (InterruptedException e) { + /* Do Nothing */ + } + catch (ExecutionException e) { + exception = new XMPPException(e.getCause()); + } + } + } + finally { + for (Future<InputStream> future : futures) { + future.cancel(true); + } + collector.cancel(); + threadPoolExecutor.shutdownNow(); + } + if (stream == null) { + if (exception != null) { + throw exception; + } + else { + throw new XMPPException("File transfer negotiation failed."); + } + } + + return stream; + } + + private StreamNegotiator determineNegotiator(Packet streamInitiation) { + return primaryFilter.accept(streamInitiation) ? primaryNegotiator : secondaryNegotiator; + } + + public OutputStream createOutgoingStream(String streamID, String initiator, String target) + throws XMPPException { + OutputStream stream; + try { + stream = primaryNegotiator.createOutgoingStream(streamID, initiator, target); + } + catch (XMPPException ex) { + stream = secondaryNegotiator.createOutgoingStream(streamID, initiator, target); + } + + return stream; + } + + public String[] getNamespaces() { + String[] primary = primaryNegotiator.getNamespaces(); + String[] secondary = secondaryNegotiator.getNamespaces(); + + String[] namespaces = new String[primary.length + secondary.length]; + System.arraycopy(primary, 0, namespaces, 0, primary.length); + System.arraycopy(secondary, 0, namespaces, primary.length, secondary.length); + + return namespaces; + } + + public void cleanup() { + } + + private class NegotiatorService implements Callable<InputStream> { + + private PacketCollector collector; + + NegotiatorService(PacketCollector collector) { + this.collector = collector; + } + + public InputStream call() throws Exception { + Packet streamInitiation = collector.nextResult( + SmackConfiguration.getPacketReplyTimeout() * 2); + if (streamInitiation == null) { + throw new XMPPException("No response from remote client"); + } + StreamNegotiator negotiator = determineNegotiator(streamInitiation); + return negotiator.negotiateIncomingStream(streamInitiation); + } + } +} |