Index: /opt/cafe-trunk/email/email-impl/impl/src/java/org/sakaiproject/email/impl/BaseDigestService.java =================================================================== --- /opt/cafe-trunk/email/email-impl/impl/src/java/org/sakaiproject/email/impl/BaseDigestService.java (revision 36604) +++ /opt/cafe-trunk/email/email-impl/impl/src/java/org/sakaiproject/email/impl/BaseDigestService.java (working copy) @@ -21,18 +21,23 @@ package org.sakaiproject.email.impl; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.Hashtable; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Stack; +import java.util.Timer; +import java.util.TimerTask; import java.util.Vector; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.sakaiproject.authz.api.SecurityService; import org.sakaiproject.component.api.ServerConfigurationService; -import org.sakaiproject.component.cover.ComponentManager; import org.sakaiproject.email.api.Digest; import org.sakaiproject.email.api.DigestEdit; import org.sakaiproject.email.api.DigestMessage; @@ -51,14 +56,14 @@ import org.sakaiproject.time.api.TimeBreakdown; import org.sakaiproject.time.api.TimeRange; import org.sakaiproject.time.api.TimeService; +import org.sakaiproject.tool.api.SessionBindingEvent; +import org.sakaiproject.tool.api.SessionBindingListener; +import org.sakaiproject.tool.api.SessionManager; import org.sakaiproject.user.api.UserDirectoryService; import org.sakaiproject.util.BaseResourcePropertiesEdit; import org.sakaiproject.util.ResourceLoader; import org.sakaiproject.util.StorageUser; import org.sakaiproject.util.Xml; -import org.sakaiproject.tool.api.SessionBindingEvent; -import org.sakaiproject.tool.api.SessionBindingListener; -import org.sakaiproject.tool.api.SessionManager; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; @@ -69,7 +74,7 @@ * BaseDigestService is the base service for DigestService. *

*/ -public abstract class BaseDigestService implements DigestService, StorageUser, Runnable +public abstract class BaseDigestService implements DigestService, StorageUser //, Runnable { /** Our logger. */ private static Log M_log = LogFactory.getLog(BasicEmailService.class); @@ -86,92 +91,123 @@ protected List m_digestQueue = new Vector(); /** The thread I run my periodic clean and report on. */ - protected Thread m_thread = null; +// protected Thread m_thread = null; /** My thread's quit flag. */ - protected boolean m_threadStop = false; +// protected boolean m_threadStop = false; - /** How long to wait between runnable runs (ms). */ - protected static final long PERIOD = 1000; + /** How long to wait between digest checks (seconds) */ + private int DIGEST_PERIOD = 3600; + /** How long to wait between digest checks (seconds) */ + public void setDIGEST_PERIOD(int digest_period) { + DIGEST_PERIOD = digest_period; + } + /** How long to wait before the first digest check (seconds) */ + private int DIGEST_DELAY = 300; + /** How long to wait before the first digest check (seconds) */ + public void setDIGEST_DELAY(int digest_delay) { + DIGEST_DELAY = digest_delay; + } + /** True if we are in the mode of sending out digests, false if we are waiting. */ protected boolean m_sendDigests = true; /** The time period last time the sendDigests() was called. */ protected String m_lastSendPeriod = null; - /********************************************************************************************************************************************************************************************************************************************************** + /** + * Use a timer for repeating actions + */ + private Timer digestTimer = new Timer(true); + + /** + * This is the name of the sakai.properties property for the DIGEST_PERIOD, + * this is how long (in seconds) the digest service will wait between checking to see if there + * are digests that need to be sent (they are always only sent once per day), default=3600 + */ + public static final String EMAIL_DIGEST_CHECK_PERIOD_PROPERTY = "email.digest.check.period"; + /** + * This is the name of the sakai.properties property for the DIGEST_DELAY, + * this is how long (in seconds) the digest service will wait after starting up + * before it does the first check for sending out digests, default=300 + */ + public static final String EMAIL_DIGEST_START_DELAY_PROPERTY = "email.digest.start.delay"; + + /********************************************************************************************************************************************************************************************************************************************************** * Runnable *********************************************************************************************************************************************************************************************************************************************************/ /** * Start the clean and report thread. */ - protected void start() - { - m_threadStop = false; +// protected void start() +// { +// m_threadStop = false; +// +// m_thread = new Thread(this, getClass().getName()); +// m_thread.start(); +// } - m_thread = new Thread(this, getClass().getName()); - m_thread.start(); - } - /** * Stop the clean and report thread. */ - protected void stop() - { - if (m_thread == null) return; +// protected void stop() +// { +// if (m_thread == null) return; +// +// // signal the thread to stop +// m_threadStop = true; +// +// // wake up the thread +// m_thread.interrupt(); +// +// m_thread = null; +// } - // signal the thread to stop - m_threadStop = true; - - // wake up the thread - m_thread.interrupt(); - - m_thread = null; - } - /** * Run the clean and report thread. */ - public void run() - { - // since we might be running while the component manager is still being created and populated, such as at server - // startup, wait here for a complete component manager - ComponentManager.waitTillConfigured(); +// public void run() +// { +// // since we might be running while the component manager is still being created and populated, such as at server +// // startup, wait here for a complete component manager +// ComponentManager.waitTillConfigured(); +// +// // loop till told to stop +// while ((!m_threadStop) && (!Thread.currentThread().isInterrupted())) +// { +// try +// { +// // process the queue of digest requests +// processQueue(); +// +// // check for a digest mailing time +// sendDigests(); +// } +// catch (Throwable e) +// { +// M_log.warn(": exception: ", e); +// } +// +// // take a small nap +// try +// { +// Thread.sleep(PERIOD); +// } +// catch (Throwable ignore) +// { +// } +// } +// } - // loop till told to stop - while ((!m_threadStop) && (!Thread.currentThread().isInterrupted())) - { - try - { - // process the queue of digest requests - processQueue(); - - // check for a digest mailing time - sendDigests(); - } - catch (Throwable e) - { - M_log.warn(": exception: ", e); - } - - // take a small nap - try - { - Thread.sleep(PERIOD); - } - catch (Throwable ignore) - { - } - } - } - /** * Attempt to process all the queued digest requests. Ones that cannot be processed now will be returned to the queue. */ protected void processQueue() { + M_log.debug("Processing mail digest queue..."); + // setup a re-try queue List retry = new Vector(); @@ -195,6 +231,7 @@ } catch (InUseException e) { + M_log.warn("digest in use, will try send again at next digest attempt: " + e.getMessage()); // retry next time retry.add(message); } @@ -215,7 +252,9 @@ */ protected void sendDigests() { - // compute the current period + if (M_log.isDebugEnabled()) M_log.debug("checking for sending digests"); + + // compute the current period String curPeriod = computeRange(timeService().newTime()).toString(); // if we are in a new period, start sending again @@ -230,7 +269,7 @@ // if we are not sending, early out if (!m_sendDigests) return; - if (M_log.isDebugEnabled()) M_log.debug("checking for sending digests"); + M_log.info("Preparing to send the mail digests for "+curPeriod); // count send candidate digests int count = 0; @@ -255,7 +294,9 @@ break; } } - if (!found) continue; + if (!found) { + continue; + } // this digest is a send candidate count++; @@ -555,17 +596,44 @@ // setup the queue m_digestQueue.clear(); - start(); + // USE A TIMER INSTEAD OF CREATING A NEW THREAD -AZ +// start(); + int digestPeriod = serverConfigurationService().getInt(EMAIL_DIGEST_CHECK_PERIOD_PROPERTY, DIGEST_PERIOD); + int digestDelay = serverConfigurationService().getInt(EMAIL_DIGEST_START_DELAY_PROPERTY, DIGEST_DELAY); + digestDelay += new Random().nextInt(60); // add some random delay to get the servers out of sync + digestTimer.schedule(new DigestTimerTask(), (digestDelay * 1000), (digestPeriod * 1000) ); - M_log.info("init()"); + M_log.info("init(): email digests will be checked in " + digestDelay + " seconds and then every " + + digestPeriod + " seconds while the server is running" ); } /** + * This timer task is run by the timer thread based on the period set above + * + * @author Aaron Zeckoski (aaron@caret.cam.ac.uk) + */ + private class DigestTimerTask extends TimerTask { + @Override + public void run() { + try { + M_log.debug("running timer task"); + // process the queue of digest requests + processQueue(); + // check for a digest mailing time + sendDigests(); + } catch (Exception e) { + M_log.error("Digest failure: " + e.getMessage(), e); + } + } + } + + /** * Returns to uninitialized state. */ public void destroy() { - stop(); +// stop(); + digestTimer.cancel(); m_storage.close(); m_storage = null; @@ -1011,11 +1079,38 @@ { // find the range String range = computeRange(period).toString(); + /* + * http://jira.sakaiproject.org/jira/browse/SAK-11841 + * If the current date/time gets out of sync with the stored date/time periods then + * messages will sit in the queue forever and cause looping in the code which will + * never resolve itself, to keep this from happening I am adding in an extra + * check as a stopgap which will do the reverse check in the case that nothing + * is retrieved, this really ugly and needs to be done a better way + * (which means a way that is not so fragile) -AZ + */ List msgs = (List) m_ranges.get(range); + if (msgs == null) { + // nothing found so go through all ranges and hack the range strings + SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm"); + for (Iterator iterator = m_ranges.keySet().iterator(); iterator.hasNext();) { + String rangeKey = (String) iterator.next(); + Date startDate; + try { + startDate = formatter.parse( range.substring(0, 12) ); + long difference = Math.abs( period.getTime() - startDate.getTime() ); + if (difference < (12 * 60 * 60 * 1000)) { + // within 12 hours of the correct period so use this one + msgs = (List) m_ranges.get(rangeKey); + break; + } + } catch (ParseException e) { + M_log.warn("Failed to parse first 12 chars from '"+rangeKey+"' into a date, aborting the attempt to find close data matches", e); + } + } + } List rv = new Vector(); - if (msgs != null) - { + if (msgs != null) { rv.addAll(msgs); } @@ -1478,4 +1573,5 @@ Time end = timeService().newTime(start.getTime() + 24 * 60 * 60 * 1000); return timeService().newTimeRange(start, end, true, false); } + }