001    /*
002     * Cumulus4j - Securing your data in the cloud - http://cumulus4j.org
003     * Copyright (C) 2011 NightLabs Consulting GmbH
004     *
005     * This program is free software: you can redistribute it and/or modify
006     * it under the terms of the GNU Affero General Public License as
007     * published by the Free Software Foundation, either version 3 of the
008     * License, or (at your option) any later version.
009     *
010     * This program is distributed in the hope that it will be useful,
011     * but WITHOUT ANY WARRANTY; without even the implied warranty of
012     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013     * GNU Affero General Public License for more details.
014     *
015     * You should have received a copy of the GNU Affero General Public License
016     * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017     */
018    package org.cumulus4j.store;
019    
020    import java.util.HashMap;
021    import java.util.HashSet;
022    import java.util.Locale;
023    import java.util.Map;
024    
025    import javax.jdo.JDOHelper;
026    import javax.jdo.PersistenceManager;
027    import javax.jdo.PersistenceManagerFactory;
028    import javax.transaction.xa.XAException;
029    import javax.transaction.xa.XAResource;
030    import javax.transaction.xa.Xid;
031    
032    import org.cumulus4j.store.model.ClassMeta;
033    import org.cumulus4j.store.model.DataEntry;
034    import org.cumulus4j.store.model.EncryptionCoordinateSet;
035    import org.cumulus4j.store.model.FieldMeta;
036    import org.cumulus4j.store.model.IndexEntryContainerSize;
037    import org.cumulus4j.store.model.Sequence;
038    import org.cumulus4j.store.resource.ResourceHelper;
039    import org.datanucleus.PersistenceConfiguration;
040    import org.datanucleus.plugin.ConfigurationElement;
041    import org.datanucleus.plugin.PluginManager;
042    import org.datanucleus.store.StoreManager;
043    import org.datanucleus.store.connection.AbstractConnectionFactory;
044    import org.datanucleus.store.connection.AbstractManagedConnection;
045    import org.datanucleus.store.connection.ManagedConnection;
046    import org.datanucleus.util.NucleusLogger;
047    import org.datanucleus.util.StringUtils;
048    
049    /**
050     * <p>
051     * Connection factory implementation for Cumulus4j-connections.
052     * </p><p>
053     * A "connection" in Cumulus4J is a <code>PersistenceManager</code> for the backing datastore.
054     * When the transaction in Cumulus4J is committed, the equivalent transaction is committed in the PM(s) of the
055     * backing datastore(s).
056     * </p><p>
057     * How to configure a connection factory is documented on
058     * <a href="http://cumulus4j.org/1.0.0/documentation/persistence-api.html">Persistence API</a>.
059     * </p> 
060     * @author Marco หงุ่ยตระกูล-Schulze - marco at nightlabs dot de
061     */
062    public class Cumulus4jConnectionFactory extends AbstractConnectionFactory
063    {
064            /** PMF for DataEntry, ClassMeta+FieldMeta, and optionally index data (if not using pmfIndex). */
065            private PersistenceManagerFactory pmf;
066    
067            /** Optional PMF for index data. */
068            private PersistenceManagerFactory pmfIndex;
069    
070            private String[] propertiesToForward = {
071                            "datanucleus.ConnectionDriverName",
072                            "datanucleus.ConnectionURL",
073                            "datanucleus.ConnectionUserName",
074                            "datanucleus.ConnectionFactory",
075                            "datanucleus.ConnectionFactoryName",
076                            "datanucleus.ConnectionFactory2",
077                            "datanucleus.ConnectionFactory2Name"
078            };
079    
080            private static final String CUMULUS4J_PROPERTY_PREFIX = "cumulus4j.";
081            private static final String CUMULUS4J_INDEX_PROPERTY_PREFIX = "cumulus4j.index.";
082    
083            private static final String[] CUMULUS4J_FORWARD_PROPERTY_PREFIXES = {
084                    CUMULUS4J_PROPERTY_PREFIX + "datanucleus.",
085                    CUMULUS4J_PROPERTY_PREFIX + "javax."
086            };
087    
088            private static final String[] CUMULUS4J_INDEX_FORWARD_PROPERTY_PREFIXES = {
089                    CUMULUS4J_INDEX_PROPERTY_PREFIX + "datanucleus.",
090                    CUMULUS4J_INDEX_PROPERTY_PREFIX + "javax."
091            };
092    
093            public Cumulus4jConnectionFactory(StoreManager storeMgr, String resourceType) {
094                    super(storeMgr, resourceType);
095    
096                    Map<String, Object> backendProperties = ResourceHelper.getCumulus4jBackendProperties();
097                    Map<String, Object> backendIndexProperties = null;
098    
099                    PersistenceConfiguration persistenceConfiguration = storeMgr.getNucleusContext().getPersistenceConfiguration();
100    
101                    // Copy the properties that are directly (as is) forwarded.
102                    for (String propKey : propertiesToForward) {
103                            Object propValue = persistenceConfiguration.getProperty(propKey);
104                            if (propValue != null)
105                                    backendProperties.put(propKey.toLowerCase(Locale.ENGLISH), propValue);
106                    }
107    
108                    // Copy the properties that are prefixed with "cumulus4j." and thus forwarded.
109                    for (Map.Entry<String, Object> me : persistenceConfiguration.getPersistenceProperties().entrySet()) {
110                            if (me.getKey() == null) // don't know if null keys can ever occur, but better play safe
111                                    continue;
112    
113                            for (String prefix : CUMULUS4J_FORWARD_PROPERTY_PREFIXES) {
114                                    if (me.getKey().startsWith(prefix)) {
115                                            String propKey = me.getKey().substring(CUMULUS4J_PROPERTY_PREFIX.length());
116                                            backendProperties.put(propKey.toLowerCase(Locale.ENGLISH), me.getValue());
117                                    }
118                            }
119    
120                            for (String prefix : CUMULUS4J_INDEX_FORWARD_PROPERTY_PREFIXES) {
121                                    if (me.getKey().startsWith(prefix)) {
122                                            String propKey = me.getKey().substring(CUMULUS4J_INDEX_PROPERTY_PREFIX.length());
123                                            if (backendIndexProperties == null) {
124                                                    backendIndexProperties = new HashMap<String, Object>(backendProperties);
125                                            }
126                                            backendIndexProperties.put(propKey.toLowerCase(Locale.ENGLISH), me.getValue());
127                                    }
128                            }
129                    }
130    
131                    // The password might be encrypted, but the getConnectionPassword(...) method decrypts it.
132                    String pw = storeMgr.getConnectionPassword();
133                    if (pw != null) {
134                            backendProperties.put("datanucleus.ConnectionPassword".toLowerCase(Locale.ENGLISH), pw);
135                    }
136    
137                    // This block is an alternative to getting Extent of each Cumulus4j schema class
138    /*              StringBuffer classNameStr = new StringBuffer();
139                    classNameStr.append(ClassMeta.class.getName()).append(",");
140                    classNameStr.append(DataEntry.class.getName()).append(",");
141                    classNameStr.append(FieldMeta.class.getName()).append(",");
142                    classNameStr.append(IndexEntryContainerSize.class.getName()).append(",");
143                    classNameStr.append(Sequence.class.getName());
144                    PluginManager pluginMgr = storeMgr.getNucleusContext().getPluginManager();
145                    ConfigurationElement[] elems = pluginMgr.getConfigurationElementsForExtension(
146                                    "org.cumulus4j.store.index_mapping", null, null);
147                    if (elems != null && elems.length > 0) {
148                            HashSet<Class> initialisedClasses = new HashSet<Class>();
149                            for (int i=0;i<elems.length;i++) {
150                                    String indexTypeName = elems[i].getAttribute("index-entry-type");
151                                    Class cls = pluginMgr.loadClass("org.cumulus4j.store.index_mapping", indexTypeName);
152                                    if (!initialisedClasses.contains(cls)) {
153                                            initialisedClasses.add(cls);
154                                            classNameStr.append(",").append(indexTypeName);
155                                    }
156                            }
157                    }
158                    cumulus4jBackendProperties.put("datanucleus.autostartmechanism", "Classes");
159                    cumulus4jBackendProperties.put("datanucleus.autostartclassnames", classNameStr.toString());*/
160    
161                    // PMF for data (and optionally index)
162                    if (backendIndexProperties == null) {
163                            NucleusLogger.GENERAL.debug("Creating PMF for Data+Index with the following properties : "+StringUtils.mapToString(backendProperties));
164                    }
165                    else {
166                            NucleusLogger.GENERAL.debug("Creating PMF for Data with the following properties : "+StringUtils.mapToString(backendProperties));
167                    }
168                    pmf = JDOHelper.getPersistenceManagerFactory(backendProperties);
169    
170                    // initialise meta-data (which partially tests it)
171                    PersistenceManager pm = pmf.getPersistenceManager();
172                    try {
173                            // Class structure meta-data
174                            pm.getExtent(ClassMeta.class);
175                            pm.getExtent(FieldMeta.class);
176    
177                            // Data
178                            pm.getExtent(DataEntry.class);
179    
180                            // Sequence for ID generation
181                            pm.getExtent(Sequence.class);
182    
183                            // Mapping for encryption settings (encryption algorithm, mode, padding, MAC, etc.
184                            // are mapped to a number which reduces the size of each record)
185                            pm.getExtent(EncryptionCoordinateSet.class);
186    
187                            if (backendIndexProperties == null) {
188                                    // Index
189                                    initialiseIndexMetaData(pm, storeMgr);
190                            }
191                    } finally {
192                            pm.close();
193                    }
194    
195                    if (backendIndexProperties != null) {
196                            // PMF for index data
197                            NucleusLogger.GENERAL.debug("Creating PMF for Index data with the following properties : "+StringUtils.mapToString(backendIndexProperties));
198                            pmfIndex = JDOHelper.getPersistenceManagerFactory(backendIndexProperties);
199    
200                            PersistenceManager pmIndex = pmfIndex.getPersistenceManager();
201                            try {
202                                    // Class structure meta-data
203                                    pmIndex.getExtent(ClassMeta.class);
204                                    pmIndex.getExtent(FieldMeta.class);
205    
206                                    // Index
207                                    initialiseIndexMetaData(pmIndex, storeMgr);
208                            } finally {
209                                    pmIndex.close();
210                            }
211                    }
212            }
213    
214            private static void initialiseIndexMetaData(PersistenceManager pm, StoreManager storeMgr)
215            {
216                    // While it is not necessary to initialise the meta-data now (can be done lazily,
217                    // when the index is used), it is still better as it prevents delays when the
218                    // data is persisted.
219                    pm.getExtent(IndexEntryContainerSize.class);
220    
221                    PluginManager pluginMgr = storeMgr.getNucleusContext().getPluginManager();
222                    ConfigurationElement[] elems = pluginMgr.getConfigurationElementsForExtension(
223                                    "org.cumulus4j.store.index_mapping", null, null);
224                    if (elems != null && elems.length > 0) {
225                            HashSet<Class<?>> initialisedClasses = new HashSet<Class<?>>();
226                            for (int i=0;i<elems.length;i++) {
227                                    String indexTypeName = elems[i].getAttribute("index-entry-type");
228                                    Class<?> cls = pluginMgr.loadClass("org.cumulus4j.store.index_mapping", indexTypeName);
229                                    if (!initialisedClasses.contains(cls)) {
230                                            initialisedClasses.add(cls);
231                                            pm.getExtent(cls);
232                                    }
233                            }
234                    }
235            }
236    
237            public PersistenceManagerFactory getPMFData() {
238                    return pmf;
239            }
240    
241            public PersistenceManagerFactory getPMFIndex() {
242                    return pmfIndex;
243            }
244    
245            @Override
246            public ManagedConnection createManagedConnection(Object poolKey, @SuppressWarnings("unchecked") Map transactionOptions)
247            {
248                    return new Cumulus4jManagedConnection(poolKey, transactionOptions);
249            }
250    
251            private class Cumulus4jManagedConnection extends AbstractManagedConnection
252            {
253                    @SuppressWarnings("unused")
254                    private Object poolKey;
255    
256                    @SuppressWarnings({"unchecked","unused"})
257                    private Map options;
258    
259                    PersistenceManagerConnection pmConnection;
260    
261                    @Override
262                    public XAResource getXAResource() {
263                            return new Cumulus4jXAResource((PersistenceManagerConnection)getConnection());
264                    }
265    
266                    public Cumulus4jManagedConnection(Object poolKey, @SuppressWarnings("unchecked") Map options) {
267                            this.poolKey = poolKey;
268                            this.options = options;
269                    }
270    
271                    @Override
272                    public void close() {
273                            if (pmConnection != null) {
274                                    PersistenceManager dataPM = pmConnection.getDataPM();
275                                    dataPM.close();
276                                    if (pmConnection.indexHasOwnPM()) {
277                                            PersistenceManager indexPM = pmConnection.getIndexPM();
278                                            indexPM.close();
279                                    }
280                                    pmConnection = null;
281                            }
282                    }
283    
284                    @Override
285                    public Object getConnection() {
286                            if (pmConnection == null) {
287                                    this.pmConnection = new PersistenceManagerConnection(pmf.getPersistenceManager(),
288                                                    pmfIndex != null ? pmfIndex.getPersistenceManager() : null);
289                            }
290                            return pmConnection;
291                    }
292            }
293    
294            class Cumulus4jXAResource implements XAResource {
295                    private PersistenceManagerConnection pmConnection;
296                    //        private Xid xid;
297    
298                    Cumulus4jXAResource(PersistenceManagerConnection pmConn) {
299                            this.pmConnection = pmConn;
300                    }
301    
302                    @Override
303                    public void start(Xid xid, int arg1) throws XAException {
304                            //              if (this.xid != null)
305                            //                      throw new IllegalStateException("Transaction already started! Cannot start twice!");
306    
307                            PersistenceManager dataPM = pmConnection.getDataPM();
308                            dataPM.currentTransaction().begin();
309                            if (pmConnection.indexHasOwnPM()) {
310                                    PersistenceManager indexPM = pmConnection.getIndexPM();
311                                    indexPM.currentTransaction().begin();
312                            }
313                            //              this.xid = xid;
314                    }
315    
316                    @Override
317                    public void commit(Xid xid, boolean arg1) throws XAException {
318                            //              if (this.xid == null)
319                            //                      throw new IllegalStateException("Transaction not active!");
320                            //
321                            //              if (!this.xid.equals(xid))
322                            //                      throw new IllegalStateException("Transaction mismatch! this.xid=" + this.xid + " otherXid=" + xid);
323    
324                            PersistenceManager dataPM = pmConnection.getDataPM();
325                            dataPM.currentTransaction().commit();
326                            if (pmConnection.indexHasOwnPM()) {
327                                    PersistenceManager indexPM = pmConnection.getIndexPM();
328                                    indexPM.currentTransaction().commit();
329                            }
330    
331                            //            this.xid = null;
332                    }
333    
334                    @Override
335                    public void rollback(Xid xid) throws XAException {
336                            //              if (this.xid == null)
337                            //                      throw new IllegalStateException("Transaction not active!");
338                            //
339                            //              if (!this.xid.equals(xid))
340                            //                      throw new IllegalStateException("Transaction mismatch! this.xid=" + this.xid + " otherXid=" + xid);
341    
342                            PersistenceManager dataPM = pmConnection.getDataPM();
343                            dataPM.currentTransaction().rollback();
344                            if (pmConnection.indexHasOwnPM()) {
345                                    PersistenceManager indexPM = pmConnection.getIndexPM();
346                                    indexPM.currentTransaction().rollback();
347                            }
348    
349                            //            this.xid = null;
350                    }
351    
352                    @Override
353                    public void end(Xid arg0, int arg1) throws XAException {
354                            //ignore
355                    }
356    
357                    @Override
358                    public void forget(Xid arg0) throws XAException {
359                            //ignore
360                    }
361    
362                    @Override
363                    public int getTransactionTimeout() throws XAException {
364                            return 0;
365                    }
366    
367                    @Override
368                    public boolean isSameRM(XAResource resource) throws XAException {
369                            if ((resource instanceof Cumulus4jXAResource) && pmConnection.equals(((Cumulus4jXAResource)resource).pmConnection))
370                                    return true;
371                            else
372                                    return false;
373                    }
374    
375                    @Override
376                    public int prepare(Xid arg0) throws XAException {
377                            return 0;
378                    }
379    
380                    @Override
381                    public Xid[] recover(int arg0) throws XAException {
382                            throw new XAException("Unsupported operation");
383                    }
384    
385                    @Override
386                    public boolean setTransactionTimeout(int arg0) throws XAException {
387                            return false;
388                    }
389            }
390    }