001 /* 002 * Cumulus4j - Securing your data in the cloud - http://cumulus4j.org 003 * Copyright (C) 2011 NightLabs Consulting GmbH 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 package org.cumulus4j.store.crypto.keymanager.messagebroker.inmemory; 019 020 import java.util.concurrent.ConcurrentHashMap; 021 import java.util.concurrent.ConcurrentLinkedQueue; 022 import java.util.concurrent.TimeoutException; 023 024 import org.cumulus4j.keymanager.back.shared.Request; 025 import org.cumulus4j.keymanager.back.shared.Response; 026 import org.cumulus4j.store.crypto.keymanager.messagebroker.AbstractMessageBroker; 027 import org.cumulus4j.store.crypto.keymanager.rest.ErrorResponseException; 028 import org.slf4j.Logger; 029 import org.slf4j.LoggerFactory; 030 031 /** 032 * <p> 033 * Implementation of {@link org.cumulus4j.store.crypto.keymanager.messagebroker.MessageBroker MessageBroker} which 034 * works only in a single JVM. It manages all messages in-memory. 035 * </p> 036 * <p> 037 * <b>Important:</b> This implementation can usually not be used in a cluster! It is only cluster-able, if you use transparent 038 * JVM-clustering, e.g. with <a target="_blank" href="http://www.terracotta.org/">Terracotta</a>! 039 * </p> 040 * 041 * @author Marco หงุ่ยตระกูล-Schulze - marco at nightlabs dot de 042 */ 043 public class MessageBrokerInMemory 044 extends AbstractMessageBroker 045 { 046 private static final Logger logger = LoggerFactory.getLogger(MessageBrokerInMemory.class); 047 048 private ConcurrentHashMap<String, ConcurrentLinkedQueue<Request>> cryptoSessionIDPrefix2requestsWaitingForProcessing = new ConcurrentHashMap<String, ConcurrentLinkedQueue<Request>>(); 049 private ConcurrentHashMap<String, Request> requestID2requestCurrentlyBeingProcessed = new ConcurrentHashMap<String, Request>(); 050 051 /** 052 * When a request was completed and a response returned, both are stored together here. 053 */ 054 private ConcurrentHashMap<Request, Response> request2response = new ConcurrentHashMap<Request, Response>(); 055 056 private ConcurrentLinkedQueue<Request> getRequestsWaitingForProcessing(String cryptoSessionIDPrefix) 057 { 058 if (cryptoSessionIDPrefix == null) 059 throw new IllegalArgumentException("cryptoSessionIDPrefix == null"); 060 061 ConcurrentLinkedQueue<Request> requestsWaitingForProcessing = cryptoSessionIDPrefix2requestsWaitingForProcessing.get(cryptoSessionIDPrefix); 062 if (requestsWaitingForProcessing == null) { 063 requestsWaitingForProcessing = new ConcurrentLinkedQueue<Request>(); 064 cryptoSessionIDPrefix2requestsWaitingForProcessing.putIfAbsent(cryptoSessionIDPrefix, requestsWaitingForProcessing); 065 requestsWaitingForProcessing = cryptoSessionIDPrefix2requestsWaitingForProcessing.get(cryptoSessionIDPrefix); 066 } 067 return requestsWaitingForProcessing; 068 } 069 070 public MessageBrokerInMemory() { 071 // TODO We should try to find out if we run in a Terracotta-environment (e.g. check a system property, if available) and 072 // throw an exception here, if we are not running with Terracotta. For now, I always log a warning. Marco :-) 073 logger.warn("MessageBrokerInMemory instantiated. This implementation is NOT cluster-able without Terracotta! You MUST NOT use it, if you do not have transparent JVM-clustering present!"); 074 } 075 076 @Override 077 protected Response _query(Class<? extends Response> responseClass, Request request) 078 throws TimeoutException, ErrorResponseException 079 { 080 return _query(responseClass, request, getQueryTimeout()); 081 } 082 083 protected Response _query(Class<? extends Response> responseClass, Request request, long queryTimeout) 084 throws TimeoutException, ErrorResponseException 085 { 086 ConcurrentLinkedQueue<Request> requestsWaitingForProcessing = getRequestsWaitingForProcessing(request.getCryptoSessionIDPrefix()); 087 requestsWaitingForProcessing.add(request); // This is a thread-safe object, hence add BEFORE synchronizing. 088 089 synchronized (requestsWaitingForProcessing) { // synchronized only necessary for notification. 090 requestsWaitingForProcessing.notify(); 091 } 092 093 long beginTimestamp = System.currentTimeMillis(); 094 Response response; 095 do { 096 097 synchronized(request2response) { 098 try { 099 request2response.wait(10000L); 100 } catch (InterruptedException e) { 101 // ignore - only log. 102 logger.warn("_query: request2response.wait(...) was interrupted with an InterruptedException."); 103 } 104 } 105 106 response = request2response.remove(request); 107 108 if (response == null && System.currentTimeMillis() - beginTimestamp > queryTimeout) { 109 logger.warn("_query: Request {} for session {} was not answered within timeout.", request.getRequestID(), request.getCryptoSessionID()); 110 111 boolean removed = requestsWaitingForProcessing.remove(request); 112 if (removed) 113 logger.warn("_query: Request {} for session {} was still in 'requestsWaitingForProcessing'.", request.getRequestID(), request.getCryptoSessionID()); 114 115 Request removedRequest = requestID2requestCurrentlyBeingProcessed.remove(request.getRequestID()); 116 if (removedRequest != null) 117 logger.warn("_query: Request {} for session {} was in 'requestID2requestCurrentlyBeingProcessed'.", request.getRequestID(), request.getCryptoSessionID()); 118 119 throw new TimeoutException("Request was not answered within timeout: " + request); 120 } 121 122 } while (response == null); 123 124 return response; 125 } 126 127 @Override 128 protected Request _pollRequest(String cryptoSessionIDPrefix) 129 { 130 return _pollRequest(cryptoSessionIDPrefix, getPollRequestTimeout()); 131 } 132 133 protected Request _pollRequest(String cryptoSessionIDPrefix, long pollRequestTimeout) 134 { 135 ConcurrentLinkedQueue<Request> requestsWaitingForProcessing = getRequestsWaitingForProcessing(cryptoSessionIDPrefix); 136 137 long beginTimestamp = System.currentTimeMillis(); 138 Request request; 139 do { 140 request = requestsWaitingForProcessing.poll(); 141 142 if (request == null) { 143 if (System.currentTimeMillis() - beginTimestamp > pollRequestTimeout) 144 break; 145 146 synchronized(requestsWaitingForProcessing) { 147 try { 148 requestsWaitingForProcessing.wait(10000L); 149 } catch (InterruptedException e) { 150 // ignore - only log - and break loop. 151 logger.warn("_pollRequest: requestsWaitingForProcessing.wait(...) was interrupted with an InterruptedException."); 152 break; 153 } 154 } 155 } 156 } while (request == null); 157 158 if (request != null) { 159 requestID2requestCurrentlyBeingProcessed.put(request.getRequestID(), request); 160 } 161 162 return request; 163 } 164 165 @Override 166 protected void _pushResponse(Response response) 167 { 168 Request request = requestID2requestCurrentlyBeingProcessed.remove(response.getRequestID()); 169 if (request == null) { 170 logger.warn("pushResponse: There is no request currently being processed with requestID={}!!!", response.getRequestID()); 171 } 172 else { 173 request2response.put(request, response); // thread-safe instance => put BEFORE synchronized block 174 synchronized (request2response) { // synchronized block only necessary for notification. 175 request2response.notifyAll(); 176 } 177 } 178 } 179 }