001    /*
002     * Cumulus4j - Securing your data in the cloud - http://cumulus4j.org
003     * Copyright (C) 2011 NightLabs Consulting GmbH
004     *
005     * This program is free software: you can redistribute it and/or modify
006     * it under the terms of the GNU Affero General Public License as
007     * published by the Free Software Foundation, either version 3 of the
008     * License, or (at your option) any later version.
009     *
010     * This program is distributed in the hope that it will be useful,
011     * but WITHOUT ANY WARRANTY; without even the implied warranty of
012     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013     * GNU Affero General Public License for more details.
014     *
015     * You should have received a copy of the GNU Affero General Public License
016     * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017     */
018    package org.cumulus4j.store.crypto.keymanager.messagebroker.inmemory;
019    
020    import java.util.concurrent.ConcurrentHashMap;
021    import java.util.concurrent.ConcurrentLinkedQueue;
022    import java.util.concurrent.TimeoutException;
023    
024    import org.cumulus4j.keymanager.back.shared.Request;
025    import org.cumulus4j.keymanager.back.shared.Response;
026    import org.cumulus4j.store.crypto.keymanager.messagebroker.AbstractMessageBroker;
027    import org.cumulus4j.store.crypto.keymanager.rest.ErrorResponseException;
028    import org.slf4j.Logger;
029    import org.slf4j.LoggerFactory;
030    
031    /**
032     * <p>
033     * Implementation of {@link org.cumulus4j.store.crypto.keymanager.messagebroker.MessageBroker MessageBroker} which
034     * works only in a single JVM. It manages all messages in-memory.
035     * </p>
036     * <p>
037     * <b>Important:</b> This implementation can usually not be used in a cluster! It is only cluster-able, if you use transparent
038     * JVM-clustering, e.g. with <a target="_blank" href="http://www.terracotta.org/">Terracotta</a>!
039     * </p>
040     *
041     * @author Marco หงุ่ยตระกูล-Schulze - marco at nightlabs dot de
042     */
043    public class MessageBrokerInMemory
044    extends AbstractMessageBroker
045    {
046            private static final Logger logger = LoggerFactory.getLogger(MessageBrokerInMemory.class);
047    
048            private ConcurrentHashMap<String, ConcurrentLinkedQueue<Request>> cryptoSessionIDPrefix2requestsWaitingForProcessing = new ConcurrentHashMap<String, ConcurrentLinkedQueue<Request>>();
049            private ConcurrentHashMap<String, Request> requestID2requestCurrentlyBeingProcessed = new ConcurrentHashMap<String, Request>();
050    
051            /**
052             * When a request was completed and a response returned, both are stored together here.
053             */
054            private ConcurrentHashMap<Request, Response> request2response = new ConcurrentHashMap<Request, Response>();
055    
056            private ConcurrentLinkedQueue<Request> getRequestsWaitingForProcessing(String cryptoSessionIDPrefix)
057            {
058                    if (cryptoSessionIDPrefix == null)
059                            throw new IllegalArgumentException("cryptoSessionIDPrefix == null");
060    
061                    ConcurrentLinkedQueue<Request> requestsWaitingForProcessing = cryptoSessionIDPrefix2requestsWaitingForProcessing.get(cryptoSessionIDPrefix);
062                    if (requestsWaitingForProcessing == null) {
063                            requestsWaitingForProcessing = new ConcurrentLinkedQueue<Request>();
064                            cryptoSessionIDPrefix2requestsWaitingForProcessing.putIfAbsent(cryptoSessionIDPrefix, requestsWaitingForProcessing);
065                            requestsWaitingForProcessing = cryptoSessionIDPrefix2requestsWaitingForProcessing.get(cryptoSessionIDPrefix);
066                    }
067                    return requestsWaitingForProcessing;
068            }
069    
070            public MessageBrokerInMemory() {
071                    // TODO We should try to find out if we run in a Terracotta-environment (e.g. check a system property, if available) and
072                    // throw an exception here, if we are not running with Terracotta. For now, I always log a warning. Marco :-)
073                    logger.warn("MessageBrokerInMemory instantiated. This implementation is NOT cluster-able without Terracotta! You MUST NOT use it, if you do not have transparent JVM-clustering present!");
074            }
075    
076            @Override
077            protected Response _query(Class<? extends Response> responseClass, Request request)
078            throws TimeoutException, ErrorResponseException
079            {
080                    return _query(responseClass, request, getQueryTimeout());
081            }
082    
083            protected Response _query(Class<? extends Response> responseClass, Request request, long queryTimeout)
084            throws TimeoutException, ErrorResponseException
085            {
086                    ConcurrentLinkedQueue<Request> requestsWaitingForProcessing = getRequestsWaitingForProcessing(request.getCryptoSessionIDPrefix());
087                    requestsWaitingForProcessing.add(request); // This is a thread-safe object, hence add BEFORE synchronizing.
088    
089                    synchronized (requestsWaitingForProcessing) { // synchronized only necessary for notification.
090                            requestsWaitingForProcessing.notify();
091                    }
092    
093                    long beginTimestamp = System.currentTimeMillis();
094                    Response response;
095                    do {
096    
097                            synchronized(request2response) {
098                                    try {
099                                            request2response.wait(10000L);
100                                    } catch (InterruptedException e) {
101                                            // ignore - only log.
102                                            logger.warn("_query: request2response.wait(...) was interrupted with an InterruptedException.");
103                                    }
104                            }
105    
106                            response = request2response.remove(request);
107    
108                            if (response == null && System.currentTimeMillis() - beginTimestamp > queryTimeout) {
109                                    logger.warn("_query: Request {} for session {} was not answered within timeout.", request.getRequestID(), request.getCryptoSessionID());
110    
111                                    boolean removed = requestsWaitingForProcessing.remove(request);
112                                    if (removed)
113                                            logger.warn("_query: Request {} for session {} was still in 'requestsWaitingForProcessing'.", request.getRequestID(), request.getCryptoSessionID());
114    
115                                    Request removedRequest = requestID2requestCurrentlyBeingProcessed.remove(request.getRequestID());
116                                    if (removedRequest != null)
117                                            logger.warn("_query: Request {} for session {} was in 'requestID2requestCurrentlyBeingProcessed'.", request.getRequestID(), request.getCryptoSessionID());
118    
119                                    throw new TimeoutException("Request was not answered within timeout: " + request);
120                            }
121    
122                    } while (response == null);
123    
124                    return response;
125            }
126    
127            @Override
128            protected Request _pollRequest(String cryptoSessionIDPrefix)
129            {
130                    return _pollRequest(cryptoSessionIDPrefix, getPollRequestTimeout());
131            }
132    
133            protected Request _pollRequest(String cryptoSessionIDPrefix, long pollRequestTimeout)
134            {
135                    ConcurrentLinkedQueue<Request> requestsWaitingForProcessing = getRequestsWaitingForProcessing(cryptoSessionIDPrefix);
136    
137                    long beginTimestamp = System.currentTimeMillis();
138                    Request request;
139                    do {
140                            request = requestsWaitingForProcessing.poll();
141    
142                            if (request == null) {
143                                    if (System.currentTimeMillis() - beginTimestamp > pollRequestTimeout)
144                                            break;
145    
146                                    synchronized(requestsWaitingForProcessing) {
147                                            try {
148                                                    requestsWaitingForProcessing.wait(10000L);
149                                            } catch (InterruptedException e) {
150                                                    // ignore - only log - and break loop.
151                                                    logger.warn("_pollRequest: requestsWaitingForProcessing.wait(...) was interrupted with an InterruptedException.");
152                                                    break;
153                                            }
154                                    }
155                            }
156                    } while (request == null);
157    
158                    if (request != null) {
159                            requestID2requestCurrentlyBeingProcessed.put(request.getRequestID(), request);
160                    }
161    
162                    return request;
163            }
164    
165            @Override
166            protected void _pushResponse(Response response)
167            {
168                    Request request = requestID2requestCurrentlyBeingProcessed.remove(response.getRequestID());
169                    if (request == null) {
170                            logger.warn("pushResponse: There is no request currently being processed with requestID={}!!!", response.getRequestID());
171                    }
172                    else {
173                            request2response.put(request, response); // thread-safe instance => put BEFORE synchronized block
174                            synchronized (request2response) { // synchronized block only necessary for notification.
175                                    request2response.notifyAll();
176                            }
177                    }
178            }
179    }