/*
Copyright 2008 The 'A Concurrent Hashtable' development team
(http://www.codeplex.com/CH/People/ProjectPeople.aspx)
This library is licensed under the GNU Library General Public License (LGPL). You should
have received a copy of the license along with the source code. If not, an online copy
of the license can be found at http://www.codeplex.com/CH/license.
*/
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Diagnostics;
namespace Orvid.Concurrent.Collections
{
///
/// Base class for concurrent hashtable implementations
///
/// Type of the items stored in the hashtable.
/// Type of the key to search with.
public abstract class ConcurrentHashtable
{
///
/// Constructor (protected)
///
/// Use Initialize method after construction.
protected ConcurrentHashtable()
{}
///
/// Initialize the newly created ConcurrentHashtable. Invoke in final (sealed) constructor
/// or Create method.
///
protected virtual void Initialize()
{
var minSegments = MinSegments;
var segmentAllocatedSpace = MinSegmentAllocatedSpace;
_CurrentRange = CreateSegmentRange(minSegments, segmentAllocatedSpace);
_NewRange = _CurrentRange;
_SwitchPoint = 0;
_AllocatedSpace = minSegments * segmentAllocatedSpace;
}
///
/// Create a segment range
///
/// Number of segments in range.
/// Number of slots allocated initialy in each segment.
/// The created instance.
internal virtual Segmentrange CreateSegmentRange(int segmentCount, int initialSegmentSize)
{ return Segmentrange.Create(segmentCount, initialSegmentSize); }
///
/// While adjusting the segmentation, _NewRange will hold a reference to the new range of segments.
/// when the adjustment is complete this reference will be copied to _CurrentRange.
///
internal Segmentrange _NewRange;
///
/// Will hold the most current reange of segments. When busy adjusting the segmentation, this
/// field will hold a reference to the old range.
///
internal Segmentrange _CurrentRange;
///
/// While adjusting the segmentation this field will hold a boundary.
/// Clients accessing items with a key hash value below this boundary (unsigned compared)
/// will access _NewRange. The others will access _CurrentRange
///
Int32 _SwitchPoint;
#region Traits
//Methods used by Segment objects that tell them how to treat stored items and search keys.
///
/// Get a hashcode for given storeable item.
///
/// Reference to the item to get a hash value for.
/// The hash value as an .
///
/// The hash returned should be properly randomized hash. The standard GetItemHashCode methods are usually not good enough.
/// A storeable item and a matching search key should return the same hash code.
/// So the statement ItemEqualsItem(storeableItem, searchKey) ? GetItemHashCode(storeableItem) == GetItemHashCode(searchKey) : true should always be true;
///
internal protected abstract UInt32 GetItemHashCode(ref TStored item);
///
/// Get a hashcode for given search key.
///
/// Reference to the key to get a hash value for.
/// The hash value as an .
///
/// The hash returned should be properly randomized hash. The standard GetItemHashCode methods are usually not good enough.
/// A storeable item and a matching search key should return the same hash code.
/// So the statement ItemEqualsItem(storeableItem, searchKey) ? GetItemHashCode(storeableItem) == GetItemHashCode(searchKey) : true should always be true;
///
internal protected abstract UInt32 GetKeyHashCode(ref TSearch key);
///
/// Compares a storeable item to a search key. Should return true if they match.
///
/// Reference to the storeable item to compare.
/// Reference to the search key to compare.
/// True if the storeable item and search key match; false otherwise.
internal protected abstract bool ItemEqualsKey(ref TStored item, ref TSearch key);
///
/// Compares two storeable items for equality.
///
/// Reference to the first storeable item to compare.
/// Reference to the second storeable item to compare.
/// True if the two soreable items should be regarded as equal.
internal protected abstract bool ItemEqualsItem(ref TStored item1, ref TStored item2);
///
/// Indicates if a specific item reference contains a valid item.
///
/// The storeable item reference to check.
/// True if the reference doesn't refer to a valid item; false otherwise.
/// The statement IsEmpty(default(TStoredI)) should always be true.
internal protected abstract bool IsEmpty(ref TStored item);
///
/// Returns the type of the key value or object.
///
/// The stored item to get the type of the key for.
/// The actual type of the key or null if it can not be determined.
///
/// Used for diagnostics purposes.
///
internal protected virtual Type GetKeyType(ref TStored item)
{ return item == null ? null : item.GetType(); }
#endregion
#region SyncRoot
readonly object _SyncRoot = new object();
///
/// Returns an object that serves as a lock for range operations
///
///
/// Clients use this primarily for enumerating over the Tables contents.
/// Locking doesn't guarantee that the contents don't change, but prevents operations that would
/// disrupt the enumeration process.
/// Operations that use this lock:
/// Count, Clear, DisposeGarbage and AssessSegmentation.
/// Keeping this lock will prevent the table from re-segmenting.
///
protected object SyncRoot { get { return _SyncRoot; } }
#endregion
#region Per segment accessors
///
/// Gets a segment out of either _NewRange or _CurrentRange based on the hash value.
///
///
///
internal Segment GetSegment(UInt32 hash)
{ return ((UInt32)hash < (UInt32)_SwitchPoint ? _NewRange : _CurrentRange).GetSegment(hash); }
///
/// Gets a LOCKED segment out of either _NewRange or _CurrentRange based on the hash value.
/// Unlock needs to be called on this segment before it can be used by other clients.
///
///
///
internal Segment GetSegmentLockedForWriting(UInt32 hash)
{
while (true)
{
var segment = GetSegment(hash);
segment.LockForWriting();
if (segment.IsAlive)
return segment;
segment.ReleaseForWriting();
}
}
///
/// Gets a LOCKED segment out of either _NewRange or _CurrentRange based on the hash value.
/// Unlock needs to be called on this segment before it can be used by other clients.
///
///
///
internal Segment GetSegmentLockedForReading(UInt32 hash)
{
while (true)
{
var segment = GetSegment(hash);
segment.LockForReading();
if (segment.IsAlive)
return segment;
segment.ReleaseForReading();
}
}
///
/// Finds an item in the table collection that maches the given searchKey
///
/// The key to the item.
/// Out reference to a field that will receive the found item.
/// A boolean that will be true if an item has been found and false otherwise.
protected bool FindItem(ref TSearch searchKey, out TStored item)
{
var segment = GetSegmentLockedForReading(this.GetKeyHashCode(ref searchKey));
try
{
return segment.FindItem(ref searchKey, out item, this);
}
finally
{ segment.ReleaseForReading(); }
}
///
/// Looks for an existing item in the table contents using an alternative copy. If it can be found it will be returned.
/// If not then the alternative copy will be added to the table contents and the alternative copy will be returned.
///
/// A copy to search an already existing instance with
/// Out reference to receive the found item or the alternative copy
/// A boolean that will be true if an existing copy was found and false otherwise.
protected virtual bool GetOldestItem(ref TStored searchKey, out TStored item)
{
var segment = GetSegmentLockedForWriting(this.GetItemHashCode(ref searchKey));
try
{
return segment.GetOldestItem(ref searchKey, out item, this);
}
finally
{ segment.ReleaseForWriting(); }
}
///
/// Replaces and existing item
///
///
///
///
/// true is the existing item was successfully replaced.
protected bool ReplaceItem(ref TSearch searchKey, ref TStored newItem, out TStored oldItem, Func sanction)
{
var segment = GetSegmentLockedForWriting(this.GetItemHashCode(ref newItem));
try
{
TStored dummy;
if (!segment.InsertItem(ref newItem, out oldItem, this))
{
segment.RemoveItem(ref searchKey, out dummy, this);
return false;
}
if (sanction(oldItem))
return true;
segment.InsertItem(ref oldItem, out dummy, this);
return false;
}
finally
{ segment.ReleaseForWriting(); }
}
///
/// Inserts an item in the table contents possibly replacing an existing item.
///
/// The item to insert in the table
/// Out reference to a field that will receive any possibly replaced item.
/// A boolean that will be true if an existing copy was found and replaced and false otherwise.
protected bool InsertItem(ref TStored searchKey, out TStored replacedItem)
{
var segment = GetSegmentLockedForWriting(this.GetItemHashCode(ref searchKey));
try
{
return segment.InsertItem(ref searchKey, out replacedItem, this);
}
finally
{ segment.ReleaseForWriting(); }
}
///
/// Removes an item from the table contents.
///
/// The key to find the item with.
/// Out reference to a field that will receive the found and removed item.
/// A boolean that will be rue if an item was found and removed and false otherwise.
protected bool RemoveItem(ref TSearch searchKey, out TStored removedItem)
{
var segment = GetSegmentLockedForWriting(this.GetKeyHashCode(ref searchKey));
try
{
return segment.RemoveItem(ref searchKey, out removedItem, this);
}
finally
{ segment.ReleaseForWriting(); }
}
#endregion
#region Collection wide accessors
//These methods require a lock on SyncRoot. They will not block regular per segment accessors (for long)
///
/// Enumerates all segments in _CurrentRange and locking them before yielding them and resleasing the lock afterwards
/// The order in which the segments are returned is undefined.
/// Lock SyncRoot before using this enumerable.
///
///
internal IEnumerable> EnumerateAmorphLockedSegments(bool forReading)
{
//if segments are locked a queue will be created to try them later
//this is so that we can continue with other not locked segments.
Queue> lockedSegmentIxs = null;
for (int i = 0, end = _CurrentRange.Count; i != end; ++i)
{
var segment = _CurrentRange.GetSegmentByIndex(i);
if (segment.Lock(forReading))
{
try { yield return segment; }
finally { segment.Release(forReading); }
}
else
{
if (lockedSegmentIxs == null)
lockedSegmentIxs = new Queue>();
lockedSegmentIxs.Enqueue(segment);
}
}
if (lockedSegmentIxs != null)
{
var ctr = lockedSegmentIxs.Count;
while (lockedSegmentIxs.Count != 0)
{
//once we retried them all and we are still not done.. wait a bit.
if (ctr-- == 0)
{
Thread.Sleep(0);
ctr = lockedSegmentIxs.Count;
}
var segment = lockedSegmentIxs.Dequeue();
if (segment.Lock(forReading))
{
try { yield return segment; }
finally { segment.Release(forReading); }
}
else
lockedSegmentIxs.Enqueue(segment);
}
}
}
///
/// Gets an IEnumerable to iterate over all items in all segments.
///
///
///
/// A lock should be aquired and held on SyncRoot while this IEnumerable is being used.
/// The order in which the items are returned is undetermined.
///
protected IEnumerable Items
{
get
{
foreach (var segment in EnumerateAmorphLockedSegments(true))
{
int j = -1;
TStored foundItem;
while ((j = segment.GetNextItem(j, out foundItem, this)) >= 0)
yield return foundItem;
}
}
}
///
/// Removes all items from the collection.
/// Aquires a lock on SyncRoot before it does it's thing.
/// When this method returns and multiple threads have access to this table it
/// is not guaranteed that the table is actually empty.
///
protected void Clear()
{
lock(SyncRoot)
foreach(var segment in EnumerateAmorphLockedSegments(false))
segment.Clear(this);
}
///
/// Returns a count of all items in teh collection. This may not be
/// aqurate when multiple threads are accessing this table.
/// Aquires a lock on SyncRoot before it does it's thing.
///
protected int Count
{
get
{
lock (SyncRoot)
{
Int32 count = 0;
//Don't need to lock a segment to get the count.
for (int i = 0, end = _CurrentRange.Count; i != end; ++i)
count += _CurrentRange.GetSegmentByIndex(i)._Count;
return count;
}
}
}
#endregion
#region Table Maintenance methods
///
/// Gives the minimum number of segments a hashtable can contain. This should be 1 or more and always a power of 2.
///
protected virtual Int32 MinSegments { get { return 4; } }
///
/// Gives the minimum number of allocated item slots per segment. This should be 1 or more, always a power of 2
/// and less than 1/2 of MeanSegmentAllocatedSpace.
///
protected virtual Int32 MinSegmentAllocatedSpace { get { return 4; } }
///
/// Gives the prefered number of allocated item slots per segment. This should be 4 or more and always a power of 2.
///
protected virtual Int32 MeanSegmentAllocatedSpace { get { return 16; } }
///
/// Determines if a segmentation adjustment is needed.
///
/// True
bool SegmentationAdjustmentNeeded()
{
var minSegments = MinSegments;
var meanSegmentAllocatedSpace = MeanSegmentAllocatedSpace;
var newSpace = Math.Max(_AllocatedSpace, minSegments * meanSegmentAllocatedSpace);
var meanSpace = _CurrentRange.Count * meanSegmentAllocatedSpace;
return newSpace > (meanSpace << 1) || newSpace <= (meanSpace >> 1);
}
///
/// Bool as int (for interlocked functions) that is true if a Segmentation assesment is pending.
///
Int32 _AssessSegmentationPending;
///
/// The total allocated number of item slots. Filled with nonempty items or not.
///
Int32 _AllocatedSpace;
///
/// When a segment resizes it uses this method to inform the hashtable of the change in allocated space.
///
///
internal void EffectTotalAllocatedSpace(Int32 effect)
{
//this might be a point of contention. But resizing of segments should happen (far) less often
//than inserts and removals and therefore this should not pose a problem.
Interlocked.Add(ref _AllocatedSpace, effect);
if ( SegmentationAdjustmentNeeded() && Interlocked.Exchange(ref _AssessSegmentationPending, 1) == 0 )
ThreadPool.QueueUserWorkItem(AssessSegmentation);
}
///
/// Schedule a call to the AssessSegmentation() method.
///
protected void ScheduleMaintenance()
{
if (Interlocked.Exchange(ref _AssessSegmentationPending, 1) == 0)
ThreadPool.QueueUserWorkItem(AssessSegmentation);
}
///
/// Checks if segmentation needs to be adjusted and if so performs the adjustment.
///
///
void AssessSegmentation(object dummy)
{
try
{
AssessSegmentation();
}
finally
{
Interlocked.Exchange(ref _AssessSegmentationPending, 0);
EffectTotalAllocatedSpace(0);
}
}
///
/// This method is called when a re-segmentation is expected to be needed. It checks if it actually is needed and, if so, performs the re-segementation.
///
protected virtual void AssessSegmentation()
{
//in case of a sudden loss of almost all content we
//may need to do this multiple times.
while (SegmentationAdjustmentNeeded())
{
var meanSegmentAllocatedSpace = MeanSegmentAllocatedSpace;
int allocatedSpace = _AllocatedSpace;
int atleastSegments = allocatedSpace / meanSegmentAllocatedSpace;
Int32 segments = MinSegments;
while (atleastSegments > segments)
segments <<= 1;
SetSegmentation(segments, meanSegmentAllocatedSpace);
}
}
///
/// Adjusts the segmentation to the new segment count
///
/// The new number of segments to use. This must be a power of 2.
/// The number of item slots to reserve in each segment.
void SetSegmentation(Int32 newSegmentCount, Int32 segmentSize)
{
//Variables to detect a bad hash.
var totalNewSegmentSize = 0;
var largestSegmentSize = 0;
Segment largestSegment = null;
lock (SyncRoot)
{
#if DEBUG
//<<<<<<<<<<<<<<<<<<<< debug <<<<<<<<<<<<<<<<<<<<<<<<
//{
// int minSize = _CurrentRange.GetSegmentByIndex(0)._List.Length;
// int maxSize = minSize;
// for (int i = 1, end = _CurrentRange.Count; i < end; ++i)
// {
// int currentSize = _CurrentRange.GetSegmentByIndex(i)._List.Length;
// if (currentSize < minSize)
// minSize = currentSize;
// if (currentSize > maxSize)
// maxSize = currentSize;
// }
// System.Diagnostics.Debug.Assert(maxSize <= 8 * minSize, "Probably a bad hash");
//}
//>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
#endif
unchecked
{
//create the new range
Segmentrange newRange = CreateSegmentRange(newSegmentCount, segmentSize);
//increase total allocated space now. We can do this safely
//because at this point _AssessSegmentationPending flag will be true,
//preventing an immediate reassesment.
Interlocked.Add(ref _AllocatedSpace, newSegmentCount * segmentSize);
//lock all new segments
//we are going to release these locks while we migrate the items from the
//old (current) range to the new range.
for (int i = 0, end = newRange.Count; i != end; ++i)
newRange.GetSegmentByIndex(i).LockForWriting();
//set new (completely locked) range
Interlocked.Exchange(ref _NewRange, newRange);
//calculate the step sizes for our switch points
var currentSwitchPointStep = (UInt32)(1 << _CurrentRange.Shift);
var newSwitchPointStep = (UInt32)(1 << newRange.Shift);
//position in new range up from where the new segments are locked
var newLockedPoint = (UInt32)0;
//At this moment _SwitchPoint should be 0
var switchPoint = (UInt32)_SwitchPoint;
do
{
Segment currentSegment;
do
{
//aquire segment to migrate
currentSegment = _CurrentRange.GetSegment(switchPoint);
//lock segment
currentSegment.LockForWriting();
if (currentSegment.IsAlive)
break;
currentSegment.ReleaseForWriting();
}
while (true);
//migrate all items in the segment to the new range
TStored currentKey;
int it = -1;
while ((it = currentSegment.GetNextItem(it, out currentKey, this)) >= 0)
{
var currentKeyHash = this.GetItemHashCode(ref currentKey);
//get the new segment. this is already locked.
var newSegment = _NewRange.GetSegment(currentKeyHash);
TStored dummyKey;
newSegment.InsertItem(ref currentKey, out dummyKey, this);
}
//substract allocated space from allocated space count and trash segment.
currentSegment.Bye(this);
currentSegment.ReleaseForWriting();
if (switchPoint == 0 - currentSwitchPointStep)
{
//we are about to wrap _SwitchPoint arround.
//We have migrated all items from the entire table to the
//new range.
//replace current with new before advancing, otherwise
//we would create a completely blocked table.
Interlocked.Exchange(ref _CurrentRange, newRange);
}
//advance _SwitchPoint
switchPoint = (UInt32)Interlocked.Add(ref _SwitchPoint, (Int32)currentSwitchPointStep);
//release lock of new segments upto the point where we can still add new items
//during this migration.
while (true)
{
var nextNewLockedPoint = newLockedPoint + newSwitchPointStep;
if (nextNewLockedPoint > switchPoint || nextNewLockedPoint == 0)
break;
var newSegment = newRange.GetSegment(newLockedPoint);
newSegment.Trim(this);
var newSegmentSize = newSegment._Count;
totalNewSegmentSize += newSegmentSize;
if( newSegmentSize > largestSegmentSize )
{
largestSegmentSize = newSegmentSize;
largestSegment = newSegment;
}
newSegment.ReleaseForWriting();
newLockedPoint = nextNewLockedPoint;
}
}
while (switchPoint != 0);
//unlock any remaining new segments
while (newLockedPoint != 0)
{
var newSegment = newRange.GetSegment(newLockedPoint);
newSegment.Trim(this);
var newSegmentSize = newSegment._Count;
totalNewSegmentSize += newSegmentSize;
if( newSegmentSize > largestSegmentSize )
{
largestSegmentSize = newSegmentSize;
largestSegment = newSegment;
}
newSegment.ReleaseForWriting();
newLockedPoint += newSwitchPointStep;
}
}
}
}
#endregion
}
}