/********************************************************************************** * $URL: https://boshi.rutgers.edu:8443/repos/sakai2.3/sakai/trunk/courier/courier-impl/impl/src/java/org/sakaiproject/courier/impl/BasicCourierService.java $ * $Id: BasicCourierService.java 786 2008-03-25 17:17:34Z hedrick $ *********************************************************************************** * * Copyright (c) 2005, 2006, 2007 The Sakai Foundation. * * Licensed under the Educational Community License, Version 1.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.opensource.org/licenses/ecl1.php * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * **********************************************************************************/ package org.sakaiproject.courier.impl; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.sakaiproject.courier.api.CourierService; import org.sakaiproject.courier.api.Delivery; /** *

* BasicCourierService is the implementation for the CourierService. *

*/ public class BasicCourierService implements CourierService { /** Our logger. */ private static final Log M_log = LogFactory.getLog(BasicCourierService.class); protected static final int nLocks = 100; /** Stores a List of Deliveries for each address, keyed by address. */ protected Map> m_addresses = new ConcurrentHashMap>(nLocks, 0.75f, nLocks); /** ** We need locking for two reasons: ** 1) To protect the integrity of the hash table, e.g. if two threads ** are doing inserts at the same time ** 2) To protect against race conditions when doing things like ** if (entry exists) { process it; delete it} ** The easy solution is to lock the whole hash table. However that ** is a performance problem, because it would mean that only one ** thread could be doing this code at a time. ** ** The solution uses here is to handle these two issues separately. ** For (1) I use ConcurrentHashMap. All operations are atomic, and ** there is no problem with multiple threads doing insert or delete. ** However this doesn't solve problem (2). ** ** For (2) what you really want to do is lock the address. Operations ** on different addresses don't interfere at all. However you can't ** lock an address directly, because if there are no entries in the ** hash table for an address, there's nothing to lock. So I hash ** the address, and lock an object corresponding to the hash value. ** That effectively locks all addresses that hash to that code. ** The number of different hash codes is a parameter. If you make it ** the same order of magnitude as the number of threads that need to ** be in this code at the same time, things should work pretty well. ** Hash collisions may cause some threads to wait for threads using ** different addresses, but at worst it should be 2 or 3 threads ** waiting, as opposed to a global lock where you'd be serializing ** all threads on a single lock. **/ protected Object[] locks; /********************************************************************************************************************************************************************************************************************************************************** * Dependencies and their setter methods *********************************************************************************************************************************************************************************************************************************************************/ /********************************************************************************************************************************************************************************************************************************************************** * Init and Destroy *********************************************************************************************************************************************************************************************************************************************************/ /** * Final initialization, once all dependencies are set. */ public void init() { M_log.info("init()"); m_addresses.clear(); locks = new Object[nLocks]; for (int i = 0; i < nLocks; i++) locks[i] = new Object(); } /** * Returns to uninitialized state. */ public void destroy() { M_log.info("destroy()"); m_addresses.clear(); locks = null; } /********************************************************************************************************************************************************************************************************************************************************** * CourierService implementation *********************************************************************************************************************************************************************************************************************************************************/ protected int slot(String s) { int hash = Math.abs(s.hashCode()); return hash % nLocks; } /** * Queue up a delivery for the client window identified in the Delivery * object. The particular form of delivery is determined by the type of * Delivery object sent. * * @param delivery * The Delivery (or extension) object to deliver. */ public void deliver(Delivery delivery) { if (M_log.isDebugEnabled()) M_log.debug("deliver(Delivery " + delivery + ")"); final String address = delivery.getAddress(); synchronized(locks[slot(address)]) { // find the entry in m_addresses List deliveries = m_addresses.get(address); // create if needed if (deliveries == null) { deliveries = new ArrayList(); } // if this doesn't exist in the list already, add it if (!deliveries.contains(delivery)) { deliveries.add(delivery); m_addresses.put(address, deliveries); } } } /** * Clear any pending delivery requests to the particular client window for * this element. * * @param address * The address of the client window. * @param elementId * The id of the html element. */ public void clear(String address, String elementId) { if (M_log.isDebugEnabled()) M_log.debug("clear(String " + address + ", String " + elementId + ")"); synchronized(locks[slot(address)]) { // find the entry in m_addresses List deliveries = m_addresses.get(address); // if not there we are done if (deliveries == null) return; // remove any Deliveries with this elementId for (Iterator it = deliveries.iterator(); it.hasNext();) { Delivery delivery = it.next(); if (delivery.getElement().equals(elementId)) { it.remove(); } } // if none left, remove it from the list if (deliveries.isEmpty()) { m_addresses.remove(address); } } } /** * Clear any pending delivery requests to this session client window. * * @param address * The address of client window. */ public void clear(String address) { if (M_log.isDebugEnabled()) M_log.debug("clear(String " + address + ")"); synchronized(locks[slot(address)]) { // remove this portal from m_addresses m_addresses.remove(address); } } /** * Access and de-queue the Deliveries queued up for a particular session client window. * * @param address * The address of client window. * @return a List of Delivery objects addressed to this session client window. * Usage Note: if you need deliveries in the order they were created, * you must call this method from a single thread per address, or * synchronize all the threads processing a single address. (Of * course you can use one thread for more than one address. The * problem is multiple threads handling deliveries for the same * address.) */ @SuppressWarnings("unchecked") public List getDeliveries(String address) { if (M_log.isDebugEnabled()) M_log.debug("getDeliveries(String " + address + ")"); List deliveries; synchronized(locks[slot(address)]) { // find the entry in m_addresses deliveries = m_addresses.get(address); if (deliveries == null) { // if null, return something return Collections.EMPTY_LIST; // this should return null! } else { m_addresses.remove(address); } } // do this outside of the sync. no reason to hold // other operations now that we've atomically gotten the list // I'm worried about delays and maybe even deadlocks if we // do arbitrary code while holding a lock. // "act" all the deliveries for (Delivery delivery : deliveries) { delivery.act(); } return deliveries; } /** * Check to see if there are any deliveries queued up for a particular * session client window. * * @param address * The address of the client window. * @return true if there are deliveries for this client window, false if * not. * * WARNING: This method is almost certainly not what you want. * I can see no way to use it that won't have synchronization problems. */ public boolean hasDeliveries(String address) { if (M_log.isDebugEnabled()) M_log.debug("hasDeliveries(String " + address + ")"); List deliveries; // find the entry in m_addresses synchronized(locks[slot(address)]) { deliveries = m_addresses.get(address); } if (deliveries == null) return false; return (!deliveries.isEmpty()); } }