001    package org.cumulus4j.store.datastoreversion;
002    
003    import java.io.StringReader;
004    import java.io.StringWriter;
005    import java.util.ArrayList;
006    import java.util.Collections;
007    import java.util.Date;
008    import java.util.HashMap;
009    import java.util.HashSet;
010    import java.util.List;
011    import java.util.Locale;
012    import java.util.Map;
013    import java.util.Properties;
014    import java.util.Set;
015    import java.util.concurrent.atomic.AtomicBoolean;
016    
017    import javax.jdo.FetchPlan;
018    import javax.jdo.PersistenceManager;
019    
020    import org.cumulus4j.store.Cumulus4jStoreManager;
021    import org.cumulus4j.store.WorkInProgressException;
022    import org.cumulus4j.store.crypto.CryptoContext;
023    import org.cumulus4j.store.datastoreversion.command.IntroduceKeyStoreRefID;
024    import org.cumulus4j.store.datastoreversion.command.MigrateToSequence2;
025    import org.cumulus4j.store.datastoreversion.command.MinimumCumulus4jVersion;
026    import org.cumulus4j.store.datastoreversion.command.RecreateIndex;
027    import org.cumulus4j.store.model.DatastoreVersion;
028    import org.cumulus4j.store.model.DatastoreVersionDAO;
029    import org.cumulus4j.store.model.KeyStoreRef;
030    
031    /**
032     * @author Marco หงุ่ยตระกูล-Schulze - marco at nightlabs dot de
033     */
034    @SuppressWarnings("unchecked")
035    public class DatastoreVersionManager {
036    
037            public static final int MANAGER_VERSION = 1;
038    
039            private static final Class<?>[] datastoreVersionCommandClasses = {
040                    // MinimumCumulus4jVersion should be the very first entry!
041                    MinimumCumulus4jVersion.class,
042    
043                    IntroduceKeyStoreRefID.class,
044                    MigrateToSequence2.class,
045                    RecreateIndex.class
046            };
047    
048            private static final List<Class<? extends DatastoreVersionCommand>> datastoreVersionCommandClassList;
049            static {
050                    List<Class<? extends DatastoreVersionCommand>> list = new ArrayList<Class<? extends DatastoreVersionCommand>>(datastoreVersionCommandClasses.length);
051                    for (Class<?> c : datastoreVersionCommandClasses) {
052                            if (c == null)
053                                    throw new IllegalStateException("datastoreVersionCommandClasses contains null element!");
054    
055                            if (!DatastoreVersionCommand.class.isAssignableFrom(c))
056                                    throw new IllegalStateException(String.format("%s does not implement %s!", c.getName(), DatastoreVersionCommand.class.getName()));
057    
058                            list.add((Class<? extends DatastoreVersionCommand>) c);
059                    }
060                    datastoreVersionCommandClassList = Collections.unmodifiableList(list);
061            }
062    
063            private Cumulus4jStoreManager storeManager;
064            private Set<Integer> performedKeyStoreRefIDs = Collections.synchronizedSet(new HashSet<Integer>());
065            private AtomicBoolean performedGlobally = new AtomicBoolean();
066    
067            public DatastoreVersionManager(Cumulus4jStoreManager storeManager) {
068                    if (storeManager == null)
069                            throw new IllegalArgumentException("storeManager == null");
070    
071                    this.storeManager = storeManager;
072            }
073    
074            public synchronized void applyOnce(CryptoContext cryptoContext) {
075                    final Integer keyStoreRefID = cryptoContext.getKeyStoreRefID();
076    
077                    // We do not need synchronisation here, because the 'performedKeyStoreRefIDs' is a synchronized set
078                    // and only one single thread will succeed in adding the keyStoreRefID.
079                    // WRONG! We do need synchronisation, because we must ensure that there is no access to the datastore,
080                    // before it has been converted to the newest version. Hence this method is 'synchronized'.
081                    // It's only the question how we can do this in a cluster-environment. But that does not matter, right now.
082                    // We might later add some DB-based lock.
083                    // Marco :-)
084                    boolean error1 = true;
085                    try {
086                            // Immediately set 'performed' to prevent endless recursions. Remove again in case of exception!
087                            if (performedKeyStoreRefIDs.add(keyStoreRefID)) {
088    
089                                    // Again no need for synchronisation because of AtomicBoolean 'performedGlobally'.
090                                    boolean error2 = true;
091                                    try {
092                                            if (performedGlobally.compareAndSet(false, true)) {
093                                                    apply(cryptoContext, KeyStoreRef.GLOBAL_KEY_STORE_REF_ID);
094                                            }
095                                            error2 = false;
096                                    } finally {
097                                            if (error2)
098                                                    performedGlobally.set(false);
099                                    }
100    
101                                    apply(cryptoContext, keyStoreRefID);
102                            }
103    
104                            error1 = false;
105                    } finally {
106                            if (error1)
107                                    performedKeyStoreRefIDs.remove(keyStoreRefID);
108                    }
109            }
110    
111            protected void apply(CryptoContext cryptoContext, int keyStoreRefID) {
112    
113                    List<PersistenceManager> persistenceManagers = new ArrayList<PersistenceManager>(2);
114                    persistenceManagers.add(cryptoContext.getPersistenceManagerForData());
115                    if (cryptoContext.getPersistenceManagerForData() != cryptoContext.getPersistenceManagerForIndex())
116                            persistenceManagers.add(cryptoContext.getPersistenceManagerForIndex());
117    
118                    for (PersistenceManager pm : persistenceManagers) {
119                            List<DatastoreVersionCommand> datastoreVersionCommands = createDatastoreVersionCommands();
120    
121                            DatastoreVersionDAO datastoreVersionDAO = new DatastoreVersionDAO(pm);
122                            Map<String, DatastoreVersion> datastoreVersionID2DatastoreVersionMap = check(
123                                            cryptoContext, keyStoreRefID, pm, datastoreVersionDAO, datastoreVersionCommands
124                            );
125                            for (DatastoreVersionCommand datastoreVersionCommand : datastoreVersionCommands) {
126                                    if (KeyStoreRef.GLOBAL_KEY_STORE_REF_ID == keyStoreRefID && datastoreVersionCommand.isKeyStoreDependent())
127                                            continue;
128    
129                                    if (KeyStoreRef.GLOBAL_KEY_STORE_REF_ID != keyStoreRefID && !datastoreVersionCommand.isKeyStoreDependent())
130                                            continue;
131    
132                                    if (!isDatastoreVersionCommandEnabled(cryptoContext, datastoreVersionCommand))
133                                            continue;
134    
135                                    try {
136                                            applyOneCommand(cryptoContext, keyStoreRefID, pm, datastoreVersionDAO, datastoreVersionID2DatastoreVersionMap, datastoreVersionCommand);
137                                    } catch (WorkInProgressException x) {
138                                            throw x;
139                                    } catch (Exception x) {
140                                            throw new CommandApplyException(
141                                                            String.format("Applying command failed: commandID='%s': %s", datastoreVersionCommand.getCommandID(), x.toString()),
142                                                            x
143                                            );
144                                    }
145                            }
146                    }
147            }
148    
149            protected boolean isDatastoreVersionCommandEnabled(CryptoContext cryptoContext, DatastoreVersionCommand datastoreVersionCommand) {
150                    String propertyKey = String.format("cumulus4j.DatastoreVersionCommand[%s].enabled", datastoreVersionCommand.getCommandID());
151                    Object propertyValue = cryptoContext.getExecutionContext().getStoreManager().getProperty(propertyKey);
152                    return propertyValue == null || !Boolean.FALSE.toString().toLowerCase(Locale.UK).equals(propertyValue.toString().toLowerCase(Locale.UK));
153            }
154    
155            protected List<DatastoreVersionCommand> createDatastoreVersionCommands() {
156                    List<DatastoreVersionCommand> datastoreVersionCommands = new ArrayList<DatastoreVersionCommand>(datastoreVersionCommandClassList.size());
157                    try {
158                            for (Class<? extends DatastoreVersionCommand> klass : datastoreVersionCommandClassList) {
159                                    DatastoreVersionCommand command = klass.newInstance();
160                                    datastoreVersionCommands.add(command);
161                            }
162                    } catch (InstantiationException e) {
163                            throw new RuntimeException(e);
164                    } catch (IllegalAccessException e) {
165                            throw new RuntimeException(e);
166                    }
167                    return datastoreVersionCommands;
168            }
169    
170            protected Map<String, DatastoreVersion> check(CryptoContext cryptoContext, int keyStoreRefID, PersistenceManager pm, DatastoreVersionDAO datastoreVersionDAO, List<DatastoreVersionCommand> datastoreVersionCommands) {
171                    Map<String, DatastoreVersion> datastoreVersionID2DatastoreVersionMap = datastoreVersionDAO.getCommandID2DatastoreVersionMap(keyStoreRefID);
172    
173                    for (DatastoreVersionCommand datastoreVersionCommand : datastoreVersionCommands) {
174                            DatastoreVersion datastoreVersion = datastoreVersionID2DatastoreVersionMap.get(datastoreVersionCommand.getCommandID());
175                            if (datastoreVersionCommand.isFinal()) {
176                                    if (datastoreVersion != null && datastoreVersion.getCommandVersion() != datastoreVersionCommand.getCommandVersion()) {
177                                            throw new IllegalStateException(String.format(
178                                                            "Final command class version does not match persistent version! datastoreVersionID='%s' datastoreVersionCommand.class='%s' datastoreVersionCommand.commandVersion=%s persistentDatastoreVersion.commandVersion=%s",
179                                                            datastoreVersionCommand.getCommandID(),
180                                                            datastoreVersionCommand.getClass().getName(),
181                                                            datastoreVersionCommand.getCommandVersion(),
182                                                            datastoreVersion.getCommandVersion()
183                                            ));
184                                    }
185                            }
186                            else if (datastoreVersion != null && datastoreVersion.getCommandVersion() > datastoreVersionCommand.getCommandVersion()) {
187                                    throw new IllegalStateException(String.format(
188                                                    "Non-final command class version is lower than persistent version! Downgrading is not supported! datastoreVersionID='%s' datastoreVersionCommand.class='%s' datastoreVersionCommand.commandVersion=%s persistentDatastoreVersion.commandVersion=%s",
189                                                    datastoreVersionCommand.getCommandID(),
190                                                    datastoreVersionCommand.getClass().getName(),
191                                                    datastoreVersionCommand.getCommandVersion(),
192                                                    datastoreVersion.getCommandVersion()
193                                    ));
194                            }
195                    }
196    
197                    return datastoreVersionID2DatastoreVersionMap;
198            }
199    
200            protected void applyOneCommand(
201                            CryptoContext cryptoContext, int keyStoreRefID, PersistenceManager pm,
202                            DatastoreVersionDAO datastoreVersionDAO, Map<String, DatastoreVersion> datastoreVersionID2DatastoreVersionMap, DatastoreVersionCommand datastoreVersionCommand
203            ) throws Exception
204            {
205                    String datastoreVersionID = datastoreVersionCommand.getCommandID();
206                    DatastoreVersion datastoreVersion = datastoreVersionID2DatastoreVersionMap.get(datastoreVersionID);
207                    if (datastoreVersion == null ||
208                                    (
209                                            !datastoreVersionCommand.isFinal() &&
210                                            datastoreVersionCommand.getCommandVersion() != datastoreVersion.getCommandVersion()
211                                    )
212                    )
213                    {
214                            DatastoreVersion datastoreVersionCopy = detachDatastoreVersion(pm, datastoreVersion);
215    //                      Map<String, DatastoreVersion> datastoreVersionID2DatastoreVersionMapCopy = detachDatastoreVersionID2DatastoreVersionMap(cryptoContext, pm, datastoreVersionID2DatastoreVersionMap);
216    
217                            Properties workInProgressStateProperties = new Properties();
218                            if (datastoreVersion == null)
219                                    datastoreVersion = new DatastoreVersion(datastoreVersionID, keyStoreRefID);
220                            else {
221                                    if (datastoreVersion.getWorkInProgressStateProperties() != null)
222                                            workInProgressStateProperties.load(new StringReader(datastoreVersion.getWorkInProgressStateProperties()));
223                            }
224    
225                            // apply
226                            try {
227                                    datastoreVersionCommand.apply(new CommandApplyParam(
228                                                    storeManager, cryptoContext, pm, datastoreVersionCopy, workInProgressStateProperties
229                                    ));
230                            } catch (WorkInProgressException x) {
231                                    datastoreVersion.setApplyTimestamp(new Date());
232                                    datastoreVersion.setWorkInProgressCommandVersion(datastoreVersionCommand.getCommandVersion());
233                                    datastoreVersion.setWorkInProgressManagerVersion(MANAGER_VERSION);
234                                    StringWriter writer = new StringWriter();
235                                    workInProgressStateProperties.store(writer, null);
236                                    datastoreVersion.setWorkInProgressStateProperties(writer.toString());
237                                    pm.flush();
238                                    throw x;
239                            }
240    
241                            datastoreVersion.setApplyTimestamp(new Date());
242                            datastoreVersion.setCommandVersion(datastoreVersionCommand.getCommandVersion());
243                            datastoreVersion.setManagerVersion(MANAGER_VERSION);
244                            datastoreVersion.setWorkInProgressCommandVersion(null);
245                            datastoreVersion.setWorkInProgressManagerVersion(null);
246                            datastoreVersion.setWorkInProgressStateProperties(""); // field does not accept null (no need for this extra info in the DB)
247                            pm.makePersistent(datastoreVersion); // just in case, it's new - otherwise doesn't hurt
248                            pm.flush(); // provoke early failure
249                    }
250            }
251    
252            protected DatastoreVersion detachDatastoreVersion(PersistenceManager pm, DatastoreVersion attached) {
253                    pm.getFetchPlan().setGroup(FetchPlan.ALL);
254                    pm.getFetchPlan().setMaxFetchDepth(-1);
255                    return attached == null ? null : pm.detachCopy(attached);
256            }
257    
258            protected Map<String, DatastoreVersion> detachDatastoreVersionID2DatastoreVersionMap(CryptoContext cryptoContext, PersistenceManager pm, Map<String, DatastoreVersion> datastoreVersionID2DatastoreVersionMap) {
259                    Map<String, DatastoreVersion> result = new HashMap<String, DatastoreVersion>(datastoreVersionID2DatastoreVersionMap.size());
260                    for (Map.Entry<String, DatastoreVersion> me : datastoreVersionID2DatastoreVersionMap.entrySet()) {
261                            result.put(me.getKey(), detachDatastoreVersion(pm, me.getValue()));
262                    }
263                    return Collections.unmodifiableMap(result);
264            }
265    }