/**********************************************************************************
* $URL: https://boshi.rutgers.edu:8443/repos/sakai2.3/sakai/trunk/courier/courier-impl/impl/src/java/org/sakaiproject/courier/impl/BasicCourierService.java $
* $Id: BasicCourierService.java 786 2008-03-25 17:17:34Z hedrick $
***********************************************************************************
*
* Copyright (c) 2005, 2006, 2007 The Sakai Foundation.
*
* Licensed under the Educational Community License, Version 1.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.opensource.org/licenses/ecl1.php
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**********************************************************************************/
package org.sakaiproject.courier.impl;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.sakaiproject.courier.api.CourierService;
import org.sakaiproject.courier.api.Delivery;
/**
*
* BasicCourierService is the implementation for the CourierService.
*
*/
public class BasicCourierService implements CourierService
{
/** Our logger. */
private static final Log M_log = LogFactory.getLog(BasicCourierService.class);
protected static final int nLocks = 100;
/** Stores a List of Deliveries for each address, keyed by address. */
protected Map> m_addresses =
new ConcurrentHashMap>(nLocks, 0.75f, nLocks);
/**
** We need locking for two reasons:
** 1) To protect the integrity of the hash table, e.g. if two threads
** are doing inserts at the same time
** 2) To protect against race conditions when doing things like
** if (entry exists) { process it; delete it}
** The easy solution is to lock the whole hash table. However that
** is a performance problem, because it would mean that only one
** thread could be doing this code at a time.
**
** The solution uses here is to handle these two issues separately.
** For (1) I use ConcurrentHashMap. All operations are atomic, and
** there is no problem with multiple threads doing insert or delete.
** However this doesn't solve problem (2).
**
** For (2) what you really want to do is lock the address. Operations
** on different addresses don't interfere at all. However you can't
** lock an address directly, because if there are no entries in the
** hash table for an address, there's nothing to lock. So I hash
** the address, and lock an object corresponding to the hash value.
** That effectively locks all addresses that hash to that code.
** The number of different hash codes is a parameter. If you make it
** the same order of magnitude as the number of threads that need to
** be in this code at the same time, things should work pretty well.
** Hash collisions may cause some threads to wait for threads using
** different addresses, but at worst it should be 2 or 3 threads
** waiting, as opposed to a global lock where you'd be serializing
** all threads on a single lock.
**/
protected Object[] locks;
/**********************************************************************************************************************************************************************************************************************************************************
* Dependencies and their setter methods
*********************************************************************************************************************************************************************************************************************************************************/
/**********************************************************************************************************************************************************************************************************************************************************
* Init and Destroy
*********************************************************************************************************************************************************************************************************************************************************/
/**
* Final initialization, once all dependencies are set.
*/
public void init()
{
M_log.info("init()");
m_addresses.clear();
locks = new Object[nLocks];
for (int i = 0; i < nLocks; i++)
locks[i] = new Object();
}
/**
* Returns to uninitialized state.
*/
public void destroy()
{
M_log.info("destroy()");
m_addresses.clear();
locks = null;
}
/**********************************************************************************************************************************************************************************************************************************************************
* CourierService implementation
*********************************************************************************************************************************************************************************************************************************************************/
protected int slot(String s) {
int hash = Math.abs(s.hashCode());
return hash % nLocks;
}
/**
* Queue up a delivery for the client window identified in the Delivery
* object. The particular form of delivery is determined by the type of
* Delivery object sent.
*
* @param delivery
* The Delivery (or extension) object to deliver.
*/
public void deliver(Delivery delivery)
{
if (M_log.isDebugEnabled())
M_log.debug("deliver(Delivery " + delivery + ")");
final String address = delivery.getAddress();
synchronized(locks[slot(address)]) {
// find the entry in m_addresses
List deliveries = m_addresses.get(address);
// create if needed
if (deliveries == null) {
deliveries = new ArrayList();
}
// if this doesn't exist in the list already, add it
if (!deliveries.contains(delivery)) {
deliveries.add(delivery);
m_addresses.put(address, deliveries);
}
}
}
/**
* Clear any pending delivery requests to the particular client window for
* this element.
*
* @param address
* The address of the client window.
* @param elementId
* The id of the html element.
*/
public void clear(String address, String elementId)
{
if (M_log.isDebugEnabled())
M_log.debug("clear(String " + address + ", String " + elementId
+ ")");
synchronized(locks[slot(address)]) {
// find the entry in m_addresses
List deliveries = m_addresses.get(address);
// if not there we are done
if (deliveries == null) return;
// remove any Deliveries with this elementId
for (Iterator it = deliveries.iterator(); it.hasNext();) {
Delivery delivery = it.next();
if (delivery.getElement().equals(elementId)) {
it.remove();
}
}
// if none left, remove it from the list
if (deliveries.isEmpty()) {
m_addresses.remove(address);
}
}
}
/**
* Clear any pending delivery requests to this session client window.
*
* @param address
* The address of client window.
*/
public void clear(String address)
{
if (M_log.isDebugEnabled())
M_log.debug("clear(String " + address + ")");
synchronized(locks[slot(address)]) {
// remove this portal from m_addresses
m_addresses.remove(address);
}
}
/**
* Access and de-queue the Deliveries queued up for a particular session client window.
*
* @param address
* The address of client window.
* @return a List of Delivery objects addressed to this session client window.
* Usage Note: if you need deliveries in the order they were created,
* you must call this method from a single thread per address, or
* synchronize all the threads processing a single address. (Of
* course you can use one thread for more than one address. The
* problem is multiple threads handling deliveries for the same
* address.)
*/
@SuppressWarnings("unchecked")
public List getDeliveries(String address)
{
if (M_log.isDebugEnabled())
M_log.debug("getDeliveries(String " + address + ")");
List deliveries;
synchronized(locks[slot(address)]) {
// find the entry in m_addresses
deliveries = m_addresses.get(address);
if (deliveries == null) { // if null, return something
return Collections.EMPTY_LIST; // this should return null!
} else {
m_addresses.remove(address);
}
}
// do this outside of the sync. no reason to hold
// other operations now that we've atomically gotten the list
// I'm worried about delays and maybe even deadlocks if we
// do arbitrary code while holding a lock.
// "act" all the deliveries
for (Delivery delivery : deliveries)
{
delivery.act();
}
return deliveries;
}
/**
* Check to see if there are any deliveries queued up for a particular
* session client window.
*
* @param address
* The address of the client window.
* @return true if there are deliveries for this client window, false if
* not.
*
* WARNING: This method is almost certainly not what you want.
* I can see no way to use it that won't have synchronization problems.
*/
public boolean hasDeliveries(String address)
{
if (M_log.isDebugEnabled())
M_log.debug("hasDeliveries(String " + address + ")");
List deliveries;
// find the entry in m_addresses
synchronized(locks[slot(address)]) {
deliveries = m_addresses.get(address);
}
if (deliveries == null) return false;
return (!deliveries.isEmpty());
}
}