Efficient concurrent 'bag' implementation for Java

17 hours ago 3
ARTICLE AD BOX

I need a most generic mutable collection implementation. The only operations I am interested in are:

Adding a new element to the collection;

Traversal in any order;

Finalizing the collection, preventing additional elements from being added.

Additional constraints apply:

Traversal should not prevent addition of new elements while it is in progress; rather, the added elements should be included in the traversal.

Once a traversal completes, I need to atomically check a simple condition based on the result calculated for the traversed elements, and possibly close the collection for future additions.

Atomically means no elements should be lost; any additions happening after the traversal completes, but before the decision is made must wait for the result of that decision.

The appending operations synchronously and atomically informs the caller if it succeeded;

The algorithm for folding/accumulating the elements is always the same, i.e. I do not need to care about interference between traversals. It is not expected that more than a single thread will attempt a traversal at a time, but if it happens, it should simply wait for the other the other to complete if necessary, but

The processing of the outcome of the traversal may happen asynchronously if needed, that is it is enough that the traverse call asynchronously submits the call for execution, as long as it starts ASAP.

Addition may happen in a contested environment; in that case, all threads should not synchronize on updating the same state (like a reference to a linked list). To be more precise, an optimistic atomic test-and-set of a reference to an immutable list is acceptable by default, but failing it too many times globally per collection should result in defaulting to something with less synchronization (it's not the atomic reference update that's a problem, but the need to create a new object; perform additional operation between the read and the write).

The collection should self-tune; the level of concurrency is not known in advance.

Broadly speaking, I imagine an array of linked lists, with threads working on a bucket based on the hash of the thread id, where the size of the array may grow based on some heuristic contention discovery. Is such adaptation even possible?

In order to better illustrate what I am thinking, some very wrong pseudo code:

boolean add(Elem elem) { if (closed()) return false if (this.state == TRAVERSAL) //obviously a race condition return beCarefulIGuess() //The collection starts with a single bucket, but that number may increase // based on atomicFailureInc() step. Bucket bucket = getBucket() Link list = bucket.list Link newList = new Link(elem, list) int state; while ((state = atomicTestAndSet(bucket, list, newList) != SUCCESS) { if (state == CLOSED) //Obviously, a race condition here... return false if (state == TRAVERSAL) //??? if (state == TRAVERSAL_END) //Do not add new elements when the traversal happening now finished, // but did not made the decision if further adding is allowed. wait() list = bucket.list newList.tail = list atomicFailureInc() //If some magical threshold is exceeded, the collection should convert // to more buckets, but not necessarily now. } return true }

I guess a dual solution of either a single bucket, or the number of buckets equal to the number of cores may be enough, hopefully? The failures may be counted with something like Adder, and just once in a while check the total.

As it's a hard problem, I would much rather rely on a trusted solution; it seems however like a generally useful structure, so I hope someone smarter than me had come up with a solution better than the naive one.

Read Entire Article