001    /*
002     * Cumulus4j - Securing your data in the cloud - http://cumulus4j.org
003     * Copyright (C) 2011 NightLabs Consulting GmbH
004     *
005     * This program is free software: you can redistribute it and/or modify
006     * it under the terms of the GNU Affero General Public License as
007     * published by the Free Software Foundation, either version 3 of the
008     * License, or (at your option) any later version.
009     *
010     * This program is distributed in the hope that it will be useful,
011     * but WITHOUT ANY WARRANTY; without even the implied warranty of
012     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013     * GNU Affero General Public License for more details.
014     *
015     * You should have received a copy of the GNU Affero General Public License
016     * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017     */
018    package org.cumulus4j.store;
019    
020    import java.util.HashMap;
021    import java.util.Locale;
022    import java.util.Map;
023    import java.util.Set;
024    
025    import javax.jdo.JDOHelper;
026    import javax.jdo.PersistenceManager;
027    import javax.jdo.PersistenceManagerFactory;
028    import javax.transaction.xa.XAException;
029    import javax.transaction.xa.XAResource;
030    import javax.transaction.xa.Xid;
031    
032    import org.cumulus4j.store.model.ClassMeta;
033    import org.cumulus4j.store.model.DataEntry;
034    import org.cumulus4j.store.model.DatastoreVersion;
035    import org.cumulus4j.store.model.EmbeddedClassMeta;
036    import org.cumulus4j.store.model.EmbeddedFieldMeta;
037    import org.cumulus4j.store.model.EncryptionCoordinateSet;
038    import org.cumulus4j.store.model.FieldMeta;
039    import org.cumulus4j.store.model.IndexEntry;
040    import org.cumulus4j.store.model.IndexEntryContainerSize;
041    import org.cumulus4j.store.model.Sequence2;
042    import org.cumulus4j.store.resource.ResourceHelper;
043    import org.datanucleus.PersistenceConfiguration;
044    import org.datanucleus.store.StoreManager;
045    import org.datanucleus.store.connection.AbstractConnectionFactory;
046    import org.datanucleus.store.connection.AbstractManagedConnection;
047    import org.datanucleus.store.connection.ManagedConnection;
048    import org.datanucleus.util.NucleusLogger;
049    import org.datanucleus.util.StringUtils;
050    
051    /**
052     * <p>
053     * Connection factory implementation for Cumulus4j-connections.
054     * </p><p>
055     * A "connection" in Cumulus4J is a <code>PersistenceManager</code> for the backing datastore.
056     * When the transaction in Cumulus4J is committed, the equivalent transaction is committed in the PM(s) of the
057     * backing datastore(s).
058     * </p><p>
059     * How to configure a connection factory is documented on
060     * <a href="http://cumulus4j.org/1.1.0/documentation/persistence-api.html">Persistence API</a>.
061     * </p>
062     * @author Marco หงุ่ยตระกูล-Schulze - marco at nightlabs dot de
063     */
064    public class Cumulus4jConnectionFactory extends AbstractConnectionFactory
065    {
066            /** PMF for DataEntry, ClassMeta+FieldMeta, and optionally index data (if not using pmfIndex). */
067            private PersistenceManagerFactory pmf;
068    
069            /** Optional PMF for index data. */
070            private PersistenceManagerFactory pmfIndex;
071    
072            private String[] propertiesToForward = {
073                            "datanucleus.ConnectionDriverName",
074                            "datanucleus.ConnectionURL",
075                            "datanucleus.ConnectionUserName",
076                            "datanucleus.ConnectionFactory",
077                            "datanucleus.ConnectionFactoryName",
078                            "datanucleus.ConnectionFactory2",
079                            "datanucleus.ConnectionFactory2Name"
080            };
081    
082            private static final String CUMULUS4J_PROPERTY_PREFIX = "cumulus4j.";
083            private static final String CUMULUS4J_INDEX_PROPERTY_PREFIX = "cumulus4j.index.";
084    
085            private static final String[] CUMULUS4J_FORWARD_PROPERTY_PREFIXES = {
086                    CUMULUS4J_PROPERTY_PREFIX + "datanucleus.",
087                    CUMULUS4J_PROPERTY_PREFIX + "javax."
088            };
089    
090            private static final String[] CUMULUS4J_INDEX_FORWARD_PROPERTY_PREFIXES = {
091                    CUMULUS4J_INDEX_PROPERTY_PREFIX + "datanucleus.",
092                    CUMULUS4J_INDEX_PROPERTY_PREFIX + "javax."
093            };
094    
095            public Cumulus4jConnectionFactory(StoreManager storeMgr, String resourceType) {
096                    super(storeMgr, resourceType);
097    
098                    Map<String, Object> backendProperties = ResourceHelper.getCumulus4jBackendProperties();
099                    Map<String, Object> persistenceProperties = ResourceHelper.getCumulus4jPersistenceProperties();
100                    Map<String, Object> backendIndexProperties = null;
101    
102                    PersistenceConfiguration persistenceConfiguration = storeMgr.getNucleusContext().getPersistenceConfiguration();
103                    for (Map.Entry<String, Object> me : persistenceProperties.entrySet()) {
104                            if (me.getKey() == null) // can't happen, but better play safe
105                                    continue;
106    
107                            if (!persistenceConfiguration.hasProperty(me.getKey())) // we load our defaults after the user config, hence we must not override!
108                                    persistenceConfiguration.setProperty(me.getKey(), me.getValue());
109                    }
110    
111                    // Copy the properties that are directly (as is) forwarded.
112                    for (String propKey : propertiesToForward) {
113                            Object propValue = persistenceConfiguration.getProperty(propKey);
114                            if (propValue != null)
115                                    backendProperties.put(propKey.toLowerCase(Locale.ENGLISH), propValue);
116                    }
117    
118                    // Copy the properties that are prefixed with "cumulus4j." and thus forwarded.
119                    for (Map.Entry<String, Object> me : persistenceConfiguration.getPersistenceProperties().entrySet()) {
120                            if (me.getKey() == null) // don't know if null keys can ever occur, but better play safe
121                                    continue;
122    
123                            for (String prefix : CUMULUS4J_FORWARD_PROPERTY_PREFIXES) {
124                                    if (me.getKey().startsWith(prefix)) {
125                                            String propKey = me.getKey().substring(CUMULUS4J_PROPERTY_PREFIX.length());
126                                            backendProperties.put(propKey.toLowerCase(Locale.ENGLISH), me.getValue());
127                                    }
128                            }
129    
130                            for (String prefix : CUMULUS4J_INDEX_FORWARD_PROPERTY_PREFIXES) {
131                                    if (me.getKey().startsWith(prefix)) {
132                                            String propKey = me.getKey().substring(CUMULUS4J_INDEX_PROPERTY_PREFIX.length());
133                                            if (backendIndexProperties == null) {
134                                                    backendIndexProperties = new HashMap<String, Object>(backendProperties);
135                                            }
136                                            backendIndexProperties.put(propKey.toLowerCase(Locale.ENGLISH), me.getValue());
137                                    }
138                            }
139                    }
140    
141                    // The password might be encrypted, but the getConnectionPassword(...) method decrypts it.
142                    String pw = storeMgr.getConnectionPassword();
143                    if (pw != null) {
144                            backendProperties.put("datanucleus.ConnectionPassword".toLowerCase(Locale.ENGLISH), pw);
145                    }
146    
147                    // This block is an alternative to getting Extent of each Cumulus4j schema class
148    /*              StringBuffer classNameStr = new StringBuffer();
149                    classNameStr.append(ClassMeta.class.getName()).append(",");
150                    classNameStr.append(DataEntry.class.getName()).append(",");
151                    classNameStr.append(FieldMeta.class.getName()).append(",");
152                    classNameStr.append(IndexEntryContainerSize.class.getName()).append(",");
153                    classNameStr.append(Sequence.class.getName());
154                    PluginManager pluginMgr = storeMgr.getNucleusContext().getPluginManager();
155                    ConfigurationElement[] elems = pluginMgr.getConfigurationElementsForExtension(
156                                    "org.cumulus4j.store.index_mapping", null, null);
157                    if (elems != null && elems.length > 0) {
158                            HashSet<Class> initialisedClasses = new HashSet<Class>();
159                            for (int i=0;i<elems.length;i++) {
160                                    String indexTypeName = elems[i].getAttribute("index-entry-type");
161                                    Class cls = pluginMgr.loadClass("org.cumulus4j.store.index_mapping", indexTypeName);
162                                    if (!initialisedClasses.contains(cls)) {
163                                            initialisedClasses.add(cls);
164                                            classNameStr.append(",").append(indexTypeName);
165                                    }
166                            }
167                    }
168                    cumulus4jBackendProperties.put("datanucleus.autostartmechanism", "Classes");
169                    cumulus4jBackendProperties.put("datanucleus.autostartclassnames", classNameStr.toString());*/
170    
171                    // PMF for data (and optionally index)
172                    if (backendIndexProperties == null) {
173                            NucleusLogger.GENERAL.debug("Creating PMF for Data+Index with the following properties : "+StringUtils.mapToString(backendProperties));
174                    }
175                    else {
176                            NucleusLogger.GENERAL.debug("Creating PMF for Data with the following properties : "+StringUtils.mapToString(backendProperties));
177                    }
178                    pmf = JDOHelper.getPersistenceManagerFactory(backendProperties);
179    
180                    // initialise meta-data (which partially tests it)
181                    PersistenceManager pm = pmf.getPersistenceManager();
182                    try {
183                            // Class structure meta-data
184                            pm.getExtent(ClassMeta.class);
185                            pm.getExtent(FieldMeta.class);
186                            pm.getExtent(EmbeddedClassMeta.class);
187                            pm.getExtent(EmbeddedFieldMeta.class);
188    
189                            // Data
190                            pm.getExtent(DataEntry.class);
191    
192                            // Sequence for ID generation
193                            pm.getExtent(Sequence2.class);
194    
195                            // Mapping for encryption settings (encryption algorithm, mode, padding, MAC, etc.
196                            // are mapped to a number which reduces the size of each record)
197                            pm.getExtent(EncryptionCoordinateSet.class);
198    
199                            // versioning of datastore structure
200                            pm.getExtent(DatastoreVersion.class);
201    
202                            if (backendIndexProperties == null) {
203                                    // Index
204                                    initialiseIndexMetaData(pm, storeMgr);
205                            }
206                    } finally {
207                            pm.close();
208                    }
209    
210                    if (backendIndexProperties != null) {
211                            // PMF for index data
212                            NucleusLogger.GENERAL.debug("Creating PMF for Index data with the following properties : "+StringUtils.mapToString(backendIndexProperties));
213                            pmfIndex = JDOHelper.getPersistenceManagerFactory(backendIndexProperties);
214    
215                            PersistenceManager pmIndex = pmfIndex.getPersistenceManager();
216                            try {
217                                    // Class structure meta-data
218                                    pmIndex.getExtent(ClassMeta.class);
219                                    pmIndex.getExtent(FieldMeta.class);
220    
221                                    // Index
222                                    initialiseIndexMetaData(pmIndex, storeMgr);
223    
224                                    // versioning of datastore structure
225                                    pm.getExtent(DatastoreVersion.class);
226                            } finally {
227                                    pmIndex.close();
228                            }
229                    }
230            }
231    
232            private static void initialiseIndexMetaData(PersistenceManager pm, StoreManager storeMgr)
233            {
234                    // While it is not necessary to initialise the meta-data now (can be done lazily,
235                    // when the index is used), it is still better as it prevents delays when the
236                    // data is persisted.
237                    // Furthermore, if the underlying database uses transactional DDL (like PostgreSQL, MSSQL
238                    // and others), and a separate JDBC connection is used for DDL (like it must be in
239                    // a JEE server), it is essentially required to initialise the meta-data in a separate
240                    // transaction before actually using the tables.
241                    pm.getExtent(IndexEntryContainerSize.class);
242    
243                    Set<Class<? extends IndexEntry>> indexEntryClasses = ((Cumulus4jStoreManager)storeMgr).getIndexFactoryRegistry().getIndexEntryClasses();
244                    for (Class<? extends IndexEntry> indexEntryClass : indexEntryClasses) {
245                            pm.getExtent(indexEntryClass);
246                    }
247    
248    //              PluginManager pluginMgr = storeMgr.getNucleusContext().getPluginManager();
249    //              ConfigurationElement[] elems = pluginMgr.getConfigurationElementsForExtension(
250    //                              "org.cumulus4j.store.index_mapping", null, null);
251    //              if (elems != null && elems.length > 0) {
252    //                      HashSet<Class<?>> initialisedClasses = new HashSet<Class<?>>();
253    //                      for (int i=0;i<elems.length;i++) {
254    //                              String indexTypeName = elems[i].getAttribute("index-entry-type");
255    //                              Class<?> cls = pluginMgr.loadClass("org.cumulus4j.store.index_mapping", indexTypeName);
256    //                              if (!initialisedClasses.contains(cls)) {
257    //                                      initialisedClasses.add(cls);
258    //                                      pm.getExtent(cls);
259    //                              }
260    //                      }
261    //              }
262            }
263    
264            public PersistenceManagerFactory getPMFData() {
265                    return pmf;
266            }
267    
268            public PersistenceManagerFactory getPMFIndex() {
269                    return pmfIndex;
270            }
271    
272            @Override
273            public ManagedConnection createManagedConnection(Object poolKey, @SuppressWarnings("rawtypes") Map transactionOptions)
274            {
275                    return new Cumulus4jManagedConnection(poolKey, transactionOptions);
276            }
277    
278            /**
279             * Cumulus4j-specific {@link ManagedConnection} implementation. Avoid to access this specific class whenever possible!
280             * @author mschulze
281             */
282            class Cumulus4jManagedConnection extends AbstractManagedConnection
283            {
284                    private Object poolKey;
285    
286                    @SuppressWarnings({"rawtypes","unused"})
287                    private Map options;
288    
289                    PersistenceManagerConnection pmConnection;
290    
291                    @Override
292                    public XAResource getXAResource() {
293                            return new Cumulus4jXAResource((PersistenceManagerConnection)getConnection());
294                    }
295    
296                    public Cumulus4jManagedConnection(Object poolKey, @SuppressWarnings("rawtypes") Map options) {
297                            this.poolKey = poolKey;
298                            this.options = options;
299                    }
300    
301                    public Object getPoolKey() {
302                            return poolKey;
303                    }
304    
305                    @Override
306                    public void close() {
307                            if (pmConnection != null) {
308                                    PersistenceManager dataPM = pmConnection.getDataPM();
309                                    dataPM.close();
310                                    if (pmConnection.indexHasOwnPM()) {
311                                            PersistenceManager indexPM = pmConnection.getIndexPM();
312                                            indexPM.close();
313                                    }
314                                    pmConnection = null;
315                            }
316                    }
317    
318                    @Override
319                    public Object getConnection() {
320                            if (pmConnection == null) {
321                                    this.pmConnection = new PersistenceManagerConnection(pmf.getPersistenceManager(),
322                                                    pmfIndex != null ? pmfIndex.getPersistenceManager() : null);
323                            }
324                            return pmConnection;
325                    }
326            }
327    
328            class Cumulus4jXAResource implements XAResource {
329                    private PersistenceManagerConnection pmConnection;
330                    //        private Xid xid;
331    
332                    Cumulus4jXAResource(PersistenceManagerConnection pmConn) {
333                            this.pmConnection = pmConn;
334                    }
335    
336                    @Override
337                    public void start(Xid xid, int arg1) throws XAException {
338                            //              if (this.xid != null)
339                            //                      throw new IllegalStateException("Transaction already started! Cannot start twice!");
340    
341                            PersistenceManager dataPM = pmConnection.getDataPM();
342                            dataPM.currentTransaction().begin();
343                            if (pmConnection.indexHasOwnPM()) {
344                                    PersistenceManager indexPM = pmConnection.getIndexPM();
345                                    indexPM.currentTransaction().begin();
346                            }
347                            //              this.xid = xid;
348                    }
349    
350                    @Override
351                    public void commit(Xid xid, boolean arg1) throws XAException {
352                            //              if (this.xid == null)
353                            //                      throw new IllegalStateException("Transaction not active!");
354                            //
355                            //              if (!this.xid.equals(xid))
356                            //                      throw new IllegalStateException("Transaction mismatch! this.xid=" + this.xid + " otherXid=" + xid);
357    
358                            PersistenceManager dataPM = pmConnection.getDataPM();
359                            dataPM.currentTransaction().commit();
360                            if (pmConnection.indexHasOwnPM()) {
361                                    PersistenceManager indexPM = pmConnection.getIndexPM();
362                                    indexPM.currentTransaction().commit();
363                            }
364    
365                            //            this.xid = null;
366                    }
367    
368                    @Override
369                    public void rollback(Xid xid) throws XAException {
370                            //              if (this.xid == null)
371                            //                      throw new IllegalStateException("Transaction not active!");
372                            //
373                            //              if (!this.xid.equals(xid))
374                            //                      throw new IllegalStateException("Transaction mismatch! this.xid=" + this.xid + " otherXid=" + xid);
375    
376                            PersistenceManager dataPM = pmConnection.getDataPM();
377                            dataPM.currentTransaction().rollback();
378                            if (pmConnection.indexHasOwnPM()) {
379                                    PersistenceManager indexPM = pmConnection.getIndexPM();
380                                    indexPM.currentTransaction().rollback();
381                            }
382    
383                            //            this.xid = null;
384                    }
385    
386                    @Override
387                    public void end(Xid arg0, int arg1) throws XAException {
388                            //ignore
389                    }
390    
391                    @Override
392                    public void forget(Xid arg0) throws XAException {
393                            //ignore
394                    }
395    
396                    @Override
397                    public int getTransactionTimeout() throws XAException {
398                            return 0;
399                    }
400    
401                    @Override
402                    public boolean isSameRM(XAResource resource) throws XAException {
403                            if ((resource instanceof Cumulus4jXAResource) && pmConnection.equals(((Cumulus4jXAResource)resource).pmConnection))
404                                    return true;
405                            else
406                                    return false;
407                    }
408    
409                    @Override
410                    public int prepare(Xid arg0) throws XAException {
411                            return 0;
412                    }
413    
414                    @Override
415                    public Xid[] recover(int arg0) throws XAException {
416                            throw new XAException("Unsupported operation");
417                    }
418    
419                    @Override
420                    public boolean setTransactionTimeout(int arg0) throws XAException {
421                            return false;
422                    }
423            }
424    }