[infinispan-dev] Stream operations under lock

classic Classic list List threaded Threaded
23 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[infinispan-dev] Stream operations under lock

William Burns-3
Some users have expressed the need to have some sort of forEach operation that is performed where the Consumer is called while holding the lock for the given key and subsequently released after the Consumer operation completes.

Due to the nature of how streams work with retries and performing the operation on the primary owner, this works out quite well with forEach to be done in an efficient way.

The problem is that this only really works well with non tx and pessimistic tx. This obviously leaves out optimistic tx, which at first I was a little worried about. But after thinking about it more, this prelocking and optimistic tx don't really fit that well together anyways. So I am thinking whenever this operation is performed it would throw an exception not letting the user use this feature in optimistic transactions.

Another question is what does the API for this look like. I was debating between 3 options myself:

1. AdvancedCache.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>> consumer)

This require the least amount of changes, however the user can't customize certain parameters that CacheStream currently provides (listed below - big one being filterKeys).

2. CacheStream.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>> consumer)
 
This method would only be allowed to be invoked on the Stream if no other intermediate operations were invoked, otherwise an exception would be thrown. This still gives us access to all of the CacheStream methods that aren't on the Stream interface (ie. sequentialDistribution, parallelDistribution, parallel, sequential, filterKeys, filterKeySegments, distributedBatchSize, disableRehashAware, timeout).

3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream()

This requires the most changes, however the API would be the most explicit. In this case the LockedStream would only have the methods on it that are able to be invoked as noted above and forEach.

I personally feel that #3 might be the cleanest, but obviously requires adding more classes. Let me know what you guys think and if you think the optimistic exclusion is acceptable.

Thanks,

 - Will

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|

Re: [infinispan-dev] Stream operations under lock

Dan Berindei
I'm leaning towards option 1.

Are you thinking about also allowing the consumer to modify the entry,
like JCache's EntryProcessors? For a consumer that can only modify the
current entry, we could even "emulate" locking in an optimistic cache
by catching the WriteSkewException and running the consumer again.

I wouldn't allow this to be mixed with other operations in a stream,
because then you may have to run filters/mappers/sorting while holding
the lock as well.

Cheers
Dan


On Tue, Mar 21, 2017 at 5:37 PM, William Burns <[hidden email]> wrote:

> Some users have expressed the need to have some sort of forEach operation
> that is performed where the Consumer is called while holding the lock for
> the given key and subsequently released after the Consumer operation
> completes.
>
> Due to the nature of how streams work with retries and performing the
> operation on the primary owner, this works out quite well with forEach to be
> done in an efficient way.
>
> The problem is that this only really works well with non tx and pessimistic
> tx. This obviously leaves out optimistic tx, which at first I was a little
> worried about. But after thinking about it more, this prelocking and
> optimistic tx don't really fit that well together anyways. So I am thinking
> whenever this operation is performed it would throw an exception not letting
> the user use this feature in optimistic transactions.
>
> Another question is what does the API for this look like. I was debating
> between 3 options myself:
>
> 1. AdvancedCache.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
> consumer)
>
> This require the least amount of changes, however the user can't customize
> certain parameters that CacheStream currently provides (listed below - big
> one being filterKeys).
>
> 2. CacheStream.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>> consumer)
>
> This method would only be allowed to be invoked on the Stream if no other
> intermediate operations were invoked, otherwise an exception would be
> thrown. This still gives us access to all of the CacheStream methods that
> aren't on the Stream interface (ie. sequentialDistribution,
> parallelDistribution, parallel, sequential, filterKeys, filterKeySegments,
> distributedBatchSize, disableRehashAware, timeout).
>
> 3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream()
>
> This requires the most changes, however the API would be the most explicit.
> In this case the LockedStream would only have the methods on it that are
> able to be invoked as noted above and forEach.
>
> I personally feel that #3 might be the cleanest, but obviously requires
> adding more classes. Let me know what you guys think and if you think the
> optimistic exclusion is acceptable.
>
> Thanks,
>
>  - Will
>
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev
_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|

Re: [infinispan-dev] Stream operations under lock

Radim Vansa
In reply to this post by William Burns-3
On 03/21/2017 04:37 PM, William Burns wrote:
> Some users have expressed the need to have some sort of forEach
> operation that is performed where the Consumer is called while holding
> the lock for the given key and subsequently released after the
> Consumer operation completes.

Seconding Dan's question - is that intended to be able to modify the
entry? In my opinion, sending a function that will work on the
ReadWriteEntryView directly to the node is the only reasonable request.
I wouldn't like to see blocking operations in there.

>
> Due to the nature of how streams work with retries and performing the
> operation on the primary owner, this works out quite well with forEach
> to be done in an efficient way.
>
> The problem is that this only really works well with non tx and
> pessimistic tx. This obviously leaves out optimistic tx, which at
> first I was a little worried about. But after thinking about it more,
> this prelocking and optimistic tx don't really fit that well together
> anyways. So I am thinking whenever this operation is performed it
> would throw an exception not letting the user use this feature in
> optimistic transactions.

How exactly reading streams interacts with transactions? Does it wrap
read entries into context? This would be a scalability issue.

I agree that "locking" should not be exposed with optimistic transactions.

With pessimistic transactions, how do you expect to handle locking
order? For regular operations, user is responsible for setting up some
locking order in order to not get a deadlock. With pessimistic
transaction, it's the cache itself who will order the calls. Also, if
you lock anything that is read, you just end up locking everything (or,
getting a deadlock). If you don't it's the same as issuing the lock and
reading again (to check the locked value) - but you'd do that internally
anyway. Therefore, I don't feel well about pessimistic transactions neither.

>
> Another question is what does the API for this look like. I was
> debating between 3 options myself:
>
> 1. AdvancedCache.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
> consumer)
>
> This require the least amount of changes, however the user can't
> customize certain parameters that CacheStream currently provides
> (listed below - big one being filterKeys).
>
> 2. CacheStream.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
> consumer)
>
> This method would only be allowed to be invoked on the Stream if no
> other intermediate operations were invoked, otherwise an exception
> would be thrown. This still gives us access to all of the CacheStream
> methods that aren't on the Stream interface (ie.
> sequentialDistribution, parallelDistribution, parallel, sequential,
> filterKeys, filterKeySegments, distributedBatchSize,
> disableRehashAware, timeout).

For both options, I don't like Cache being passed around. You should
modify the CacheEntry (or some kind of view) directly.

Radim

>
> 3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream()
>
> This requires the most changes, however the API would be the most
> explicit. In this case the LockedStream would only have the methods on
> it that are able to be invoked as noted above and forEach.
>
> I personally feel that #3 might be the cleanest, but obviously
> requires adding more classes. Let me know what you guys think and if
> you think the optimistic exclusion is acceptable.
>
> Thanks,
>
>  - Will
>
>
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev


--
Radim Vansa <[hidden email]>
JBoss Performance Team

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|

Re: [infinispan-dev] Stream operations under lock

William Burns-3
In reply to this post by Dan Berindei


On Tue, Mar 21, 2017 at 12:17 PM Dan Berindei <[hidden email]> wrote:
I'm leaning towards option 1.

This is actually what I have implemented right now. The big problem I find is the lack of configuring the operation. For example I know that users will require being able to pass in a set of keys. The more I think about it a Predicate would also be preferred. This could be handled by having another override of the method that takes this Predicate or Set of keys. My main concern was whether we thought users may want to tweak other settings such as timeout, parallel or sequential operation per node or cluster and also batch size possibly to reduce chances of more than once operations. This ends up exploding the overrides, which is why I was hoping to use something like the Stream interface to handle each as a different method invocation. This is what brought me to think of #2, and subsequently #3 after I thought of excluding all those methods.
 

Are you thinking about also allowing the consumer to modify the entry,
like JCache's EntryProcessors? For a consumer that can only modify the
current entry, we could even "emulate" locking in an optimistic cache
by catching the WriteSkewException and running the consumer again.

That is one possible case (updating cache entry). To be honest the user could want it for other reasons that I am not privy to.

The catching of WriteSkewException is interesting, but I don't think it is acceptable. The problem is that the value could change between which could in turn change what the user wants to write or operation they perform and there is no way to inform them.
 

I wouldn't allow this to be mixed with other operations in a stream,
because then you may have to run filters/mappers/sorting while holding
the lock as well.

That was my idea as well and why I was preventing those in all three APIs. Although as I mentioned above allowing filter should be fine too. It also makes the process of locking and unlocking the entry quite cumbersome if we allowed other operations.
 

Cheers
Dan


On Tue, Mar 21, 2017 at 5:37 PM, William Burns <[hidden email]> wrote:
> Some users have expressed the need to have some sort of forEach operation
> that is performed where the Consumer is called while holding the lock for
> the given key and subsequently released after the Consumer operation
> completes.
>
> Due to the nature of how streams work with retries and performing the
> operation on the primary owner, this works out quite well with forEach to be
> done in an efficient way.
>
> The problem is that this only really works well with non tx and pessimistic
> tx. This obviously leaves out optimistic tx, which at first I was a little
> worried about. But after thinking about it more, this prelocking and
> optimistic tx don't really fit that well together anyways. So I am thinking
> whenever this operation is performed it would throw an exception not letting
> the user use this feature in optimistic transactions.
>
> Another question is what does the API for this look like. I was debating
> between 3 options myself:
>
> 1. AdvancedCache.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
> consumer)
>
> This require the least amount of changes, however the user can't customize
> certain parameters that CacheStream currently provides (listed below - big
> one being filterKeys).
>
> 2. CacheStream.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>> consumer)
>
> This method would only be allowed to be invoked on the Stream if no other
> intermediate operations were invoked, otherwise an exception would be
> thrown. This still gives us access to all of the CacheStream methods that
> aren't on the Stream interface (ie. sequentialDistribution,
> parallelDistribution, parallel, sequential, filterKeys, filterKeySegments,
> distributedBatchSize, disableRehashAware, timeout).
>
> 3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream()
>
> This requires the most changes, however the API would be the most explicit.
> In this case the LockedStream would only have the methods on it that are
> able to be invoked as noted above and forEach.
>
> I personally feel that #3 might be the cleanest, but obviously requires
> adding more classes. Let me know what you guys think and if you think the
> optimistic exclusion is acceptable.
>
> Thanks,
>
>  - Will
>
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev
_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|

Re: [infinispan-dev] Stream operations under lock

William Burns-3
In reply to this post by Radim Vansa


On Tue, Mar 21, 2017 at 12:53 PM Radim Vansa <[hidden email]> wrote:
On 03/21/2017 04:37 PM, William Burns wrote:
> Some users have expressed the need to have some sort of forEach
> operation that is performed where the Consumer is called while holding
> the lock for the given key and subsequently released after the
> Consumer operation completes.

Seconding Dan's question - is that intended to be able to modify the
entry? In my opinion, sending a function that will work on the
ReadWriteEntryView directly to the node is the only reasonable request.
I wouldn't like to see blocking operations in there.

Hrmm the user can use the FunctionalMap interface for this then it seems? I wonder if this should just be the going in API. I will need to discuss with Galder the semantics of the evalAll/evalMany methods.
 

>
> Due to the nature of how streams work with retries and performing the
> operation on the primary owner, this works out quite well with forEach
> to be done in an efficient way.
>
> The problem is that this only really works well with non tx and
> pessimistic tx. This obviously leaves out optimistic tx, which at
> first I was a little worried about. But after thinking about it more,
> this prelocking and optimistic tx don't really fit that well together
> anyways. So I am thinking whenever this operation is performed it
> would throw an exception not letting the user use this feature in
> optimistic transactions.

How exactly reading streams interacts with transactions? Does it wrap
read entries into context? This would be a scalability issue.

It doesn't wrap read entries into the context for that exact reason. It does however use existing entries in the context to override ones in memory/store.
 

I agree that "locking" should not be exposed with optimistic transactions.

Yeah I can't find a good way to do this really and it seems to be opposite of what optimistic transactions are.
 

With pessimistic transactions, how do you expect to handle locking
order? For regular operations, user is responsible for setting up some
locking order in order to not get a deadlock. With pessimistic
transaction, it's the cache itself who will order the calls. Also, if
you lock anything that is read, you just end up locking everything (or,
getting a deadlock). If you don't it's the same as issuing the lock and
reading again (to check the locked value) - but you'd do that internally
anyway. Therefore, I don't feel well about pessimistic transactions neither.

The lock is done per key only for each invocation. There is no ordering as only one is obtained at a time before it goes to the next. If the user then acquires a lock for another key while in the Consumer this could cause a deadlock if the inverse occurs on a different thread/node, but this is on the user. It is the same as it is today really, except we do the read lock for them before invoking their Consumer.
 

>
> Another question is what does the API for this look like. I was
> debating between 3 options myself:
>
> 1. AdvancedCache.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
> consumer)
>
> This require the least amount of changes, however the user can't
> customize certain parameters that CacheStream currently provides
> (listed below - big one being filterKeys).
>
> 2. CacheStream.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
> consumer)
>
> This method would only be allowed to be invoked on the Stream if no
> other intermediate operations were invoked, otherwise an exception
> would be thrown. This still gives us access to all of the CacheStream
> methods that aren't on the Stream interface (ie.
> sequentialDistribution, parallelDistribution, parallel, sequential,
> filterKeys, filterKeySegments, distributedBatchSize,
> disableRehashAware, timeout).

For both options, I don't like Cache being passed around. You should
modify the CacheEntry (or some kind of view) directly.

I don't know for sure if that is sufficient for the user. Sometimes they may modify another Cache given the value in this one for example, which they could access from the CacheManager of that Cache. Maybe Tristan knows more about some use cases.
 

Radim

>
> 3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream()
>
> This requires the most changes, however the API would be the most
> explicit. In this case the LockedStream would only have the methods on
> it that are able to be invoked as noted above and forEach.
>
> I personally feel that #3 might be the cleanest, but obviously
> requires adding more classes. Let me know what you guys think and if
> you think the optimistic exclusion is acceptable.
>
> Thanks,
>
>  - Will
>
>
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev


--
Radim Vansa <[hidden email]>
JBoss Performance Team

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|

Re: [infinispan-dev] Stream operations under lock

William Burns-3


On Tue, Mar 21, 2017 at 1:42 PM William Burns <[hidden email]> wrote:
On Tue, Mar 21, 2017 at 12:53 PM Radim Vansa <[hidden email]> wrote:
On 03/21/2017 04:37 PM, William Burns wrote:
> Some users have expressed the need to have some sort of forEach
> operation that is performed where the Consumer is called while holding
> the lock for the given key and subsequently released after the
> Consumer operation completes.

Seconding Dan's question - is that intended to be able to modify the
entry? In my opinion, sending a function that will work on the
ReadWriteEntryView directly to the node is the only reasonable request.
I wouldn't like to see blocking operations in there.

Hrmm the user can use the FunctionalMap interface for this then it seems? I wonder if this should just be the going in API. I will need to discuss with Galder the semantics of the evalAll/evalMany methods.

Actually looking at evalAll it seems it doesn't scale as it keeps all entries in memory at once, so this is only for caches with a limited amount of entries.
 
 

>
> Due to the nature of how streams work with retries and performing the
> operation on the primary owner, this works out quite well with forEach
> to be done in an efficient way.
>
> The problem is that this only really works well with non tx and
> pessimistic tx. This obviously leaves out optimistic tx, which at
> first I was a little worried about. But after thinking about it more,
> this prelocking and optimistic tx don't really fit that well together
> anyways. So I am thinking whenever this operation is performed it
> would throw an exception not letting the user use this feature in
> optimistic transactions.

How exactly reading streams interacts with transactions? Does it wrap
read entries into context? This would be a scalability issue.

It doesn't wrap read entries into the context for that exact reason. It does however use existing entries in the context to override ones in memory/store.
 

I agree that "locking" should not be exposed with optimistic transactions.

Yeah I can't find a good way to do this really and it seems to be opposite of what optimistic transactions are.
 

With pessimistic transactions, how do you expect to handle locking
order? For regular operations, user is responsible for setting up some
locking order in order to not get a deadlock. With pessimistic
transaction, it's the cache itself who will order the calls. Also, if
you lock anything that is read, you just end up locking everything (or,
getting a deadlock). If you don't it's the same as issuing the lock and
reading again (to check the locked value) - but you'd do that internally
anyway. Therefore, I don't feel well about pessimistic transactions neither.

The lock is done per key only for each invocation. There is no ordering as only one is obtained at a time before it goes to the next. If the user then acquires a lock for another key while in the Consumer this could cause a deadlock if the inverse occurs on a different thread/node, but this is on the user. It is the same as it is today really, except we do the read lock for them before invoking their Consumer.
 

>
> Another question is what does the API for this look like. I was
> debating between 3 options myself:
>
> 1. AdvancedCache.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
> consumer)
>
> This require the least amount of changes, however the user can't
> customize certain parameters that CacheStream currently provides
> (listed below - big one being filterKeys).
>
> 2. CacheStream.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
> consumer)
>
> This method would only be allowed to be invoked on the Stream if no
> other intermediate operations were invoked, otherwise an exception
> would be thrown. This still gives us access to all of the CacheStream
> methods that aren't on the Stream interface (ie.
> sequentialDistribution, parallelDistribution, parallel, sequential,
> filterKeys, filterKeySegments, distributedBatchSize,
> disableRehashAware, timeout).

For both options, I don't like Cache being passed around. You should
modify the CacheEntry (or some kind of view) directly.

I don't know for sure if that is sufficient for the user. Sometimes they may modify another Cache given the value in this one for example, which they could access from the CacheManager of that Cache. Maybe Tristan knows more about some use cases.
 

Radim

>
> 3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream()
>
> This requires the most changes, however the API would be the most
> explicit. In this case the LockedStream would only have the methods on
> it that are able to be invoked as noted above and forEach.
>
> I personally feel that #3 might be the cleanest, but obviously
> requires adding more classes. Let me know what you guys think and if
> you think the optimistic exclusion is acceptable.
>
> Thanks,
>
>  - Will
>
>
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev


--
Radim Vansa <[hidden email]>
JBoss Performance Team

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|

Re: [infinispan-dev] Stream operations under lock

Tristan Tarrant-2
In reply to this post by William Burns-3


On 21/03/17 16:37, William Burns wrote:

> Some users have expressed the need to have some sort of forEach
> operation that is performed where the Consumer is called while holding
> the lock for the given key and subsequently released after the Consumer
> operation completes.
>
> Due to the nature of how streams work with retries and performing the
> operation on the primary owner, this works out quite well with forEach
> 1. AdvancedCache.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
> consumer)
>
> This require the least amount of changes, however the user can't
> customize certain parameters that CacheStream currently provides (listed
> below - big one being filterKeys).
>
> 2. CacheStream.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>> consumer)
>
> This method would only be allowed to be invoked on the Stream if no
> other intermediate operations were invoked, otherwise an exception would
> be thrown. This still gives us access to all of the CacheStream methods
> that aren't on the Stream interface (ie. sequentialDistribution,
> parallelDistribution, parallel, sequential, filterKeys,
> filterKeySegments, distributedBatchSize, disableRehashAware, timeout).
>
> 3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream()
>
> This requires the most changes, however the API would be the most
> explicit. In this case the LockedStream would only have the methods on
> it that are able to be invoked as noted above and forEach.
>
> I personally feel that #3 might be the cleanest, but obviously requires
> adding more classes. Let me know what you guys think and if you think
> the optimistic exclusion is acceptable.

I prefer option 3, because I don't like UnsupportedOperationExceptions
lurking until you hit runtime. Can you quantify the amount of extra
effort here ?

Tristan

--
Tristan Tarrant
Infinispan Lead
JBoss, a division of Red Hat
_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|

Re: [infinispan-dev] Stream operations under lock

William Burns-3


On Tue, Mar 21, 2017 at 4:28 PM Tristan Tarrant <[hidden email]> wrote:


On 21/03/17 16:37, William Burns wrote:
> Some users have expressed the need to have some sort of forEach
> operation that is performed where the Consumer is called while holding
> the lock for the given key and subsequently released after the Consumer
> operation completes.
>
> Due to the nature of how streams work with retries and performing the
> operation on the primary owner, this works out quite well with forEach
> 1. AdvancedCache.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
> consumer)
>
> This require the least amount of changes, however the user can't
> customize certain parameters that CacheStream currently provides (listed
> below - big one being filterKeys).
>
> 2. CacheStream.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>> consumer)
>
> This method would only be allowed to be invoked on the Stream if no
> other intermediate operations were invoked, otherwise an exception would
> be thrown. This still gives us access to all of the CacheStream methods
> that aren't on the Stream interface (ie. sequentialDistribution,
> parallelDistribution, parallel, sequential, filterKeys,
> filterKeySegments, distributedBatchSize, disableRehashAware, timeout).
>
> 3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream()
>
> This requires the most changes, however the API would be the most
> explicit. In this case the LockedStream would only have the methods on
> it that are able to be invoked as noted above and forEach.
>
> I personally feel that #3 might be the cleanest, but obviously requires
> adding more classes. Let me know what you guys think and if you think
> the optimistic exclusion is acceptable.

I prefer option 3, because I don't like UnsupportedOperationExceptions
lurking until you hit runtime. Can you quantify the amount of extra
effort here ?

I would probably add a new interface that defines the common methods between CacheStream and LockedStream (or whatever we call it). Then the actual LockedStream class should be a self contained class. Thinking about it more it shouldn't require more than a couple hundred extra lines of code. The hard part was just getting the locking to work with non tx and pessimistic tx, which is common to any impl.

Also just to note you would still get an UnsupportedOperationException when running in optimistic tx when invoking the method.
 

Tristan

--
Tristan Tarrant
Infinispan Lead
JBoss, a division of Red Hat
_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|

Re: [infinispan-dev] Stream operations under lock

Radim Vansa
In reply to this post by William Burns-3
On 03/21/2017 06:50 PM, William Burns wrote:

>
>
> On Tue, Mar 21, 2017 at 1:42 PM William Burns <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     On Tue, Mar 21, 2017 at 12:53 PM Radim Vansa <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         On 03/21/2017 04:37 PM, William Burns wrote:
>         > Some users have expressed the need to have some sort of forEach
>         > operation that is performed where the Consumer is called
>         while holding
>         > the lock for the given key and subsequently released after the
>         > Consumer operation completes.
>
>         Seconding Dan's question - is that intended to be able to
>         modify the
>         entry? In my opinion, sending a function that will work on the
>         ReadWriteEntryView directly to the node is the only reasonable
>         request.
>         I wouldn't like to see blocking operations in there.
>
>
>     Hrmm the user can use the FunctionalMap interface for this then it
>     seems? I wonder if this should just be the going in API. I will
>     need to discuss with Galder the semantics of the evalAll/evalMany
>     methods.
>
>
> Actually looking at evalAll it seems it doesn't scale as it keeps all
> entries in memory at once, so this is only for caches with a limited
> amount of entries.

Don't look into the implementation; I think Galder has focused more on
the API side than having optimal implementation. IMO there's no reason
evalAll should load all the entries into memory in non-transactional mode.

>
>         >
>         > Due to the nature of how streams work with retries and
>         performing the
>         > operation on the primary owner, this works out quite well
>         with forEach
>         > to be done in an efficient way.
>         >
>         > The problem is that this only really works well with non tx and
>         > pessimistic tx. This obviously leaves out optimistic tx,
>         which at
>         > first I was a little worried about. But after thinking about
>         it more,
>         > this prelocking and optimistic tx don't really fit that well
>         together
>         > anyways. So I am thinking whenever this operation is
>         performed it
>         > would throw an exception not letting the user use this
>         feature in
>         > optimistic transactions.
>
>         How exactly reading streams interacts with transactions? Does
>         it wrap
>         read entries into context? This would be a scalability issue.
>
>
>     It doesn't wrap read entries into the context for that exact
>     reason. It does however use existing entries in the context to
>     override ones in memory/store.
>

Uuh, so you end up with a copy of the cache in single invocation
context, without any means to flush it. I think that we need add
InvocationContext.current().forget(key) API (throwing exception if the
entry was modified) or something like that, even for the regular
streams. Maybe an override for filter methods, too, because you want to
pass a nice predicate, but you can't just forget all filtered out entries.

>
>         I agree that "locking" should not be exposed with optimistic
>         transactions.
>
>
>     Yeah I can't find a good way to do this really and it seems to be
>     opposite of what optimistic transactions are.
>
>
>         With pessimistic transactions, how do you expect to handle locking
>         order? For regular operations, user is responsible for setting
>         up some
>         locking order in order to not get a deadlock. With pessimistic
>         transaction, it's the cache itself who will order the calls.
>         Also, if
>         you lock anything that is read, you just end up locking
>         everything (or,
>         getting a deadlock). If you don't it's the same as issuing the
>         lock and
>         reading again (to check the locked value) - but you'd do that
>         internally
>         anyway. Therefore, I don't feel well about pessimistic
>         transactions neither.
>
>
>     The lock is done per key only for each invocation. There is no
>     ordering as only one is obtained at a time before it goes to the
>     next. If the user then acquires a lock for another key while in
>     the Consumer this could cause a deadlock if the inverse occurs on
>     a different thread/node, but this is on the user. It is the same
>     as it is today really, except we do the read lock for them before
>     invoking their Consumer.
>

In pessimistic mode, you should not release a lock before the end of the
transaction.

>
>         >
>         > Another question is what does the API for this look like. I was
>         > debating between 3 options myself:
>         >
>         > 1. AdvancedCache.forEachWithLock(BiConsumer<Cache,
>         CacheEntry<K, V>>
>         > consumer)
>         >
>         > This require the least amount of changes, however the user can't
>         > customize certain parameters that CacheStream currently provides
>         > (listed below - big one being filterKeys).
>         >
>         > 2. CacheStream.forEachWithLock(BiConsumer<Cache,
>         CacheEntry<K, V>>
>         > consumer)
>         >
>         > This method would only be allowed to be invoked on the
>         Stream if no
>         > other intermediate operations were invoked, otherwise an
>         exception
>         > would be thrown. This still gives us access to all of the
>         CacheStream
>         > methods that aren't on the Stream interface (ie.
>         > sequentialDistribution, parallelDistribution, parallel,
>         sequential,
>         > filterKeys, filterKeySegments, distributedBatchSize,
>         > disableRehashAware, timeout).
>
>         For both options, I don't like Cache being passed around. You
>         should
>         modify the CacheEntry (or some kind of view) directly.
>
>
>     I don't know for sure if that is sufficient for the user.
>     Sometimes they may modify another Cache given the value in this
>     one for example, which they could access from the CacheManager of
>     that Cache. Maybe Tristan knows more about some use cases.
>

Rather than guessing what could the user need, the Consumer could be CDI
enabled.

>
>         Radim
>
>         >
>         > 3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream()
>         >
>         > This requires the most changes, however the API would be the
>         most
>         > explicit. In this case the LockedStream would only have the
>         methods on
>         > it that are able to be invoked as noted above and forEach.
>         >
>         > I personally feel that #3 might be the cleanest, but obviously
>         > requires adding more classes. Let me know what you guys
>         think and if
>         > you think the optimistic exclusion is acceptable.
>         >
>         > Thanks,
>         >
>         >  - Will
>         >
>         >
>         > _______________________________________________
>         > infinispan-dev mailing list
>         > [hidden email]
>         <mailto:[hidden email]>
>         > https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
>
>         --
>         Radim Vansa <[hidden email] <mailto:[hidden email]>>
>         JBoss Performance Team
>
>         _______________________________________________
>         infinispan-dev mailing list
>         [hidden email]
>         <mailto:[hidden email]>
>         https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
>
>
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev


--
Radim Vansa <[hidden email]>
JBoss Performance Team

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|

Re: [infinispan-dev] Stream operations under lock

William Burns-3


On Wed, Mar 22, 2017 at 5:51 AM Radim Vansa <[hidden email]> wrote:
On 03/21/2017 06:50 PM, William Burns wrote:
>
>
> On Tue, Mar 21, 2017 at 1:42 PM William Burns <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     On Tue, Mar 21, 2017 at 12:53 PM Radim Vansa <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         On 03/21/2017 04:37 PM, William Burns wrote:
>         > Some users have expressed the need to have some sort of forEach
>         > operation that is performed where the Consumer is called
>         while holding
>         > the lock for the given key and subsequently released after the
>         > Consumer operation completes.
>
>         Seconding Dan's question - is that intended to be able to
>         modify the
>         entry? In my opinion, sending a function that will work on the
>         ReadWriteEntryView directly to the node is the only reasonable
>         request.
>         I wouldn't like to see blocking operations in there.
>
>
>     Hrmm the user can use the FunctionalMap interface for this then it
>     seems? I wonder if this should just be the going in API. I will
>     need to discuss with Galder the semantics of the evalAll/evalMany
>     methods.
>
>
> Actually looking at evalAll it seems it doesn't scale as it keeps all
> entries in memory at once, so this is only for caches with a limited
> amount of entries.

Don't look into the implementation; I think Galder has focused more on
the API side than having optimal implementation. IMO there's no reason
evalAll should load all the entries into memory in non-transactional mode.


I agree that it shouldn't, but there is no guarantee this will be ready any time soon.
 
>
>         >
>         > Due to the nature of how streams work with retries and
>         performing the
>         > operation on the primary owner, this works out quite well
>         with forEach
>         > to be done in an efficient way.
>         >
>         > The problem is that this only really works well with non tx and
>         > pessimistic tx. This obviously leaves out optimistic tx,
>         which at
>         > first I was a little worried about. But after thinking about
>         it more,
>         > this prelocking and optimistic tx don't really fit that well
>         together
>         > anyways. So I am thinking whenever this operation is
>         performed it
>         > would throw an exception not letting the user use this
>         feature in
>         > optimistic transactions.
>
>         How exactly reading streams interacts with transactions? Does
>         it wrap
>         read entries into context? This would be a scalability issue.
>
>
>     It doesn't wrap read entries into the context for that exact
>     reason. It does however use existing entries in the context to
>     override ones in memory/store.
>

Uuh, so you end up with a copy of the cache in single invocation
context, without any means to flush it. I think that we need add

Maybe I worded it poorly. Streams don't wrap any entries at all. All it does is read from the current context if there any, it then reads from the data container (skipping entries it read from the context) and then finally reading from the store if present.

Although the more I think about it using Stream with lock this may be a non issue, read below.
 
InvocationContext.current().forget(key) API (throwing exception if the
entry was modified) or something like that, even for the regular
streams. Maybe an override for filter methods, too, because you want to
pass a nice predicate, but you can't just forget all filtered out entries.

>
>         I agree that "locking" should not be exposed with optimistic
>         transactions.
>
>
>     Yeah I can't find a good way to do this really and it seems to be
>     opposite of what optimistic transactions are.
>
>
>         With pessimistic transactions, how do you expect to handle locking
>         order? For regular operations, user is responsible for setting
>         up some
>         locking order in order to not get a deadlock. With pessimistic
>         transaction, it's the cache itself who will order the calls.
>         Also, if
>         you lock anything that is read, you just end up locking
>         everything (or,
>         getting a deadlock). If you don't it's the same as issuing the
>         lock and
>         reading again (to check the locked value) - but you'd do that
>         internally
>         anyway. Therefore, I don't feel well about pessimistic
>         transactions neither.
>
>
>     The lock is done per key only for each invocation. There is no
>     ordering as only one is obtained at a time before it goes to the
>     next. If the user then acquires a lock for another key while in
>     the Consumer this could cause a deadlock if the inverse occurs on
>     a different thread/node, but this is on the user. It is the same
>     as it is today really, except we do the read lock for them before
>     invoking their Consumer.
>

In pessimistic mode, you should not release a lock before the end of the
transaction.

I agree. I didn't discuss finer details, but what I have right now doesn't work with an ongoing pessimistic transaction. And to be honest I am not sure if this can work with an ongoing transaction. And even if it did it would be horrendously horrendously slow since each remote node is performing the Consumer but it would need to read from the originators context (or have it copied over). Unless someone with more transaction knowledge knows of a better way I personally feel using stream with lock should run in its own dedicated transaction.
 

>
>         >
>         > Another question is what does the API for this look like. I was
>         > debating between 3 options myself:
>         >
>         > 1. AdvancedCache.forEachWithLock(BiConsumer<Cache,
>         CacheEntry<K, V>>
>         > consumer)
>         >
>         > This require the least amount of changes, however the user can't
>         > customize certain parameters that CacheStream currently provides
>         > (listed below - big one being filterKeys).
>         >
>         > 2. CacheStream.forEachWithLock(BiConsumer<Cache,
>         CacheEntry<K, V>>
>         > consumer)
>         >
>         > This method would only be allowed to be invoked on the
>         Stream if no
>         > other intermediate operations were invoked, otherwise an
>         exception
>         > would be thrown. This still gives us access to all of the
>         CacheStream
>         > methods that aren't on the Stream interface (ie.
>         > sequentialDistribution, parallelDistribution, parallel,
>         sequential,
>         > filterKeys, filterKeySegments, distributedBatchSize,
>         > disableRehashAware, timeout).
>
>         For both options, I don't like Cache being passed around. You
>         should
>         modify the CacheEntry (or some kind of view) directly.
>
>
>     I don't know for sure if that is sufficient for the user.
>     Sometimes they may modify another Cache given the value in this
>     one for example, which they could access from the CacheManager of
>     that Cache. Maybe Tristan knows more about some use cases.
>

Rather than guessing what could the user need, the Consumer could be CDI
enabled.

I am not the biggest CDI fan, especially since the benefit of this is you can use lambdas (that automatically become Serializable) and make very concise code. But others can chime in here too.
 

>
>         Radim
>
>         >
>         > 3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream()
>         >
>         > This requires the most changes, however the API would be the
>         most
>         > explicit. In this case the LockedStream would only have the
>         methods on
>         > it that are able to be invoked as noted above and forEach.
>         >
>         > I personally feel that #3 might be the cleanest, but obviously
>         > requires adding more classes. Let me know what you guys
>         think and if
>         > you think the optimistic exclusion is acceptable.
>         >
>         > Thanks,
>         >
>         >  - Will
>         >
>         >
>         > _______________________________________________
>         > infinispan-dev mailing list
>         > [hidden email]
>         <mailto:[hidden email]>
>         > https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
>
>         --
>         Radim Vansa <[hidden email] <mailto:[hidden email]>>
>         JBoss Performance Team
>
>         _______________________________________________
>         infinispan-dev mailing list
>         [hidden email]
>         <mailto:[hidden email]>
>         https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
>
>
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev


--
Radim Vansa <[hidden email]>
JBoss Performance Team

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|

Re: [infinispan-dev] Stream operations under lock

Dan Berindei
In reply to this post by Radim Vansa
On Wed, Mar 22, 2017 at 11:51 AM, Radim Vansa <[hidden email]> wrote:

> On 03/21/2017 06:50 PM, William Burns wrote:
>>
>>
>> On Tue, Mar 21, 2017 at 1:42 PM William Burns <[hidden email]
>> <mailto:[hidden email]>> wrote:
>>
>>     On Tue, Mar 21, 2017 at 12:53 PM Radim Vansa <[hidden email]
>>     <mailto:[hidden email]>> wrote:
>>
>>         On 03/21/2017 04:37 PM, William Burns wrote:
>>         > Some users have expressed the need to have some sort of forEach
>>         > operation that is performed where the Consumer is called
>>         while holding
>>         > the lock for the given key and subsequently released after the
>>         > Consumer operation completes.
>>
>>         Seconding Dan's question - is that intended to be able to
>>         modify the
>>         entry? In my opinion, sending a function that will work on the
>>         ReadWriteEntryView directly to the node is the only reasonable
>>         request.
>>         I wouldn't like to see blocking operations in there.
>>
>>
>>     Hrmm the user can use the FunctionalMap interface for this then it
>>     seems? I wonder if this should just be the going in API. I will
>>     need to discuss with Galder the semantics of the evalAll/evalMany
>>     methods.
>>
>>
>> Actually looking at evalAll it seems it doesn't scale as it keeps all
>> entries in memory at once, so this is only for caches with a limited
>> amount of entries.
>
> Don't look into the implementation; I think Galder has focused more on
> the API side than having optimal implementation. IMO there's no reason
> evalAll should load all the entries into memory in non-transactional mode.
>

I'm pretty sure we do need to load all the entries in order to provide
REPEATABLE_READ isolation.

>>
>>         >
>>         > Due to the nature of how streams work with retries and
>>         performing the
>>         > operation on the primary owner, this works out quite well
>>         with forEach
>>         > to be done in an efficient way.
>>         >
>>         > The problem is that this only really works well with non tx and
>>         > pessimistic tx. This obviously leaves out optimistic tx,
>>         which at
>>         > first I was a little worried about. But after thinking about
>>         it more,
>>         > this prelocking and optimistic tx don't really fit that well
>>         together
>>         > anyways. So I am thinking whenever this operation is
>>         performed it
>>         > would throw an exception not letting the user use this
>>         feature in
>>         > optimistic transactions.
>>
>>         How exactly reading streams interacts with transactions? Does
>>         it wrap
>>         read entries into context? This would be a scalability issue.
>>
>>
>>     It doesn't wrap read entries into the context for that exact
>>     reason. It does however use existing entries in the context to
>>     override ones in memory/store.
>>
>
> Uuh, so you end up with a copy of the cache in single invocation
> context, without any means to flush it. I think that we need add
> InvocationContext.current().forget(key) API (throwing exception if the
> entry was modified) or something like that, even for the regular
> streams. Maybe an override for filter methods, too, because you want to
> pass a nice predicate, but you can't just forget all filtered out entries.
>

I think Will said he *doesn't* want to wrap the entries read by the consumer :)

IMO there's no "good" way to provide repeatable read isolation for a
transaction that reads all the keys in the cache, so this API should
create a separate transaction for each entry. I wouldn't try to make
the consumers see the current transaction's modifications if started
from a transaction either, I'd throw an exception if started from a
transaction instead.

>>
>>         I agree that "locking" should not be exposed with optimistic
>>         transactions.
>>
>>
>>     Yeah I can't find a good way to do this really and it seems to be
>>     opposite of what optimistic transactions are.
>>

Ok, the name forEachWithLock doesn't really fit with optimistic
locking, but I think with a more neutral name it could work for
optimistic caches as well.

>>
>>         With pessimistic transactions, how do you expect to handle locking
>>         order? For regular operations, user is responsible for setting
>>         up some
>>         locking order in order to not get a deadlock. With pessimistic
>>         transaction, it's the cache itself who will order the calls.
>>         Also, if
>>         you lock anything that is read, you just end up locking
>>         everything (or,
>>         getting a deadlock). If you don't it's the same as issuing the
>>         lock and
>>         reading again (to check the locked value) - but you'd do that
>>         internally
>>         anyway. Therefore, I don't feel well about pessimistic
>>         transactions neither.
>>
>>
>>     The lock is done per key only for each invocation. There is no
>>     ordering as only one is obtained at a time before it goes to the
>>     next. If the user then acquires a lock for another key while in
>>     the Consumer this could cause a deadlock if the inverse occurs on
>>     a different thread/node, but this is on the user. It is the same
>>     as it is today really, except we do the read lock for them before
>>     invoking their Consumer.
>>
>
> In pessimistic mode, you should not release a lock before the end of the
> transaction.
>

Exactly. Each consumer needs to have its own transaction, otherwise
the transaction's lockedKeys collection would have to grow to include
all the keys in the cache.

>>
>>         >
>>         > Another question is what does the API for this look like. I was
>>         > debating between 3 options myself:
>>         >
>>         > 1. AdvancedCache.forEachWithLock(BiConsumer<Cache,
>>         CacheEntry<K, V>>
>>         > consumer)
>>         >
>>         > This require the least amount of changes, however the user can't
>>         > customize certain parameters that CacheStream currently provides
>>         > (listed below - big one being filterKeys).
>>         >
>>         > 2. CacheStream.forEachWithLock(BiConsumer<Cache,
>>         CacheEntry<K, V>>
>>         > consumer)
>>         >
>>         > This method would only be allowed to be invoked on the
>>         Stream if no
>>         > other intermediate operations were invoked, otherwise an
>>         exception
>>         > would be thrown. This still gives us access to all of the
>>         CacheStream
>>         > methods that aren't on the Stream interface (ie.
>>         > sequentialDistribution, parallelDistribution, parallel,
>>         sequential,
>>         > filterKeys, filterKeySegments, distributedBatchSize,
>>         > disableRehashAware, timeout).
>>
>>         For both options, I don't like Cache being passed around. You
>>         should
>>         modify the CacheEntry (or some kind of view) directly.
>>
>>
>>     I don't know for sure if that is sufficient for the user.
>>     Sometimes they may modify another Cache given the value in this
>>     one for example, which they could access from the CacheManager of
>>     that Cache. Maybe Tristan knows more about some use cases.
>>
>
> Rather than guessing what could the user need, the Consumer could be CDI
> enabled.
>

If the user actually needs to work with more than one entry at a time,
I think it would be much cleaner for him to use regular forEach() and
start an explicit transaction in the consumer.

>>
>>         Radim
>>
>>         >
>>         > 3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream()
>>         >
>>         > This requires the most changes, however the API would be the
>>         most
>>         > explicit. In this case the LockedStream would only have the
>>         methods on
>>         > it that are able to be invoked as noted above and forEach.
>>         >
>>         > I personally feel that #3 might be the cleanest, but obviously
>>         > requires adding more classes. Let me know what you guys
>>         think and if
>>         > you think the optimistic exclusion is acceptable.
>>         >
>>         > Thanks,
>>         >
>>         >  - Will
>>         >
_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|

Re: [infinispan-dev] Stream operations under lock

Galder Zamarreño
In reply to this post by Dan Berindei

--
Galder Zamarreño
Infinispan, Red Hat

> On 21 Mar 2017, at 17:16, Dan Berindei <[hidden email]> wrote:
>
> I'm leaning towards option 1.
>
> Are you thinking about also allowing the consumer to modify the entry,
> like JCache's EntryProcessors? For a consumer that can only modify the
> current entry, we could even "emulate" locking in an optimistic cache
> by catching the WriteSkewException and running the consumer again.
>
> I wouldn't allow this to be mixed with other operations in a stream,
> because then you may have to run filters/mappers/sorting while holding
> the lock as well.

^ Would forEach w/ lock still run for all entries in originator? If so, not being able to filter could be a pain. IOW, you'd be forcing all entries to be shipped to a node and user to do its own filtering. Not ideal :\


>
> Cheers
> Dan
>
>
> On Tue, Mar 21, 2017 at 5:37 PM, William Burns <[hidden email]> wrote:
>> Some users have expressed the need to have some sort of forEach operation
>> that is performed where the Consumer is called while holding the lock for
>> the given key and subsequently released after the Consumer operation
>> completes.
>>
>> Due to the nature of how streams work with retries and performing the
>> operation on the primary owner, this works out quite well with forEach to be
>> done in an efficient way.
>>
>> The problem is that this only really works well with non tx and pessimistic
>> tx. This obviously leaves out optimistic tx, which at first I was a little
>> worried about. But after thinking about it more, this prelocking and
>> optimistic tx don't really fit that well together anyways. So I am thinking
>> whenever this operation is performed it would throw an exception not letting
>> the user use this feature in optimistic transactions.
>>
>> Another question is what does the API for this look like. I was debating
>> between 3 options myself:
>>
>> 1. AdvancedCache.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
>> consumer)
>>
>> This require the least amount of changes, however the user can't customize
>> certain parameters that CacheStream currently provides (listed below - big
>> one being filterKeys).
>>
>> 2. CacheStream.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>> consumer)
>>
>> This method would only be allowed to be invoked on the Stream if no other
>> intermediate operations were invoked, otherwise an exception would be
>> thrown. This still gives us access to all of the CacheStream methods that
>> aren't on the Stream interface (ie. sequentialDistribution,
>> parallelDistribution, parallel, sequential, filterKeys, filterKeySegments,
>> distributedBatchSize, disableRehashAware, timeout).
>>
>> 3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream()
>>
>> This requires the most changes, however the API would be the most explicit.
>> In this case the LockedStream would only have the methods on it that are
>> able to be invoked as noted above and forEach.
>>
>> I personally feel that #3 might be the cleanest, but obviously requires
>> adding more classes. Let me know what you guys think and if you think the
>> optimistic exclusion is acceptable.
>>
>> Thanks,
>>
>> - Will
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> [hidden email]
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev


_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|

Re: [infinispan-dev] Stream operations under lock

Galder Zamarreño
In reply to this post by Radim Vansa

--
Galder Zamarreño
Infinispan, Red Hat

> On 22 Mar 2017, at 10:51, Radim Vansa <[hidden email]> wrote:
>
> On 03/21/2017 06:50 PM, William Burns wrote:
>>
>>
>> On Tue, Mar 21, 2017 at 1:42 PM William Burns <[hidden email]
>> <mailto:[hidden email]>> wrote:
>>
>>    On Tue, Mar 21, 2017 at 12:53 PM Radim Vansa <[hidden email]
>>    <mailto:[hidden email]>> wrote:
>>
>>        On 03/21/2017 04:37 PM, William Burns wrote:
>>> Some users have expressed the need to have some sort of forEach
>>> operation that is performed where the Consumer is called
>>        while holding
>>> the lock for the given key and subsequently released after the
>>> Consumer operation completes.
>>
>>        Seconding Dan's question - is that intended to be able to
>>        modify the
>>        entry? In my opinion, sending a function that will work on the
>>        ReadWriteEntryView directly to the node is the only reasonable
>>        request.
>>        I wouldn't like to see blocking operations in there.
>>
>>
>>    Hrmm the user can use the FunctionalMap interface for this then it
>>    seems? I wonder if this should just be the going in API. I will
>>    need to discuss with Galder the semantics of the evalAll/evalMany
>>    methods.
>>
>>
>> Actually looking at evalAll it seems it doesn't scale as it keeps all
>> entries in memory at once, so this is only for caches with a limited
>> amount of entries.
>
> Don't look into the implementation; I think Galder has focused more on
> the API side than having optimal implementation.

That's why it's marked experimental ;p

> IMO there's no reason
> evalAll should load all the entries into memory in non-transactional mode.
>
>>
>>>
>>> Due to the nature of how streams work with retries and
>>        performing the
>>> operation on the primary owner, this works out quite well
>>        with forEach
>>> to be done in an efficient way.
>>>
>>> The problem is that this only really works well with non tx and
>>> pessimistic tx. This obviously leaves out optimistic tx,
>>        which at
>>> first I was a little worried about. But after thinking about
>>        it more,
>>> this prelocking and optimistic tx don't really fit that well
>>        together
>>> anyways. So I am thinking whenever this operation is
>>        performed it
>>> would throw an exception not letting the user use this
>>        feature in
>>> optimistic transactions.
>>
>>        How exactly reading streams interacts with transactions? Does
>>        it wrap
>>        read entries into context? This would be a scalability issue.
>>
>>
>>    It doesn't wrap read entries into the context for that exact
>>    reason. It does however use existing entries in the context to
>>    override ones in memory/store.
>>
>
> Uuh, so you end up with a copy of the cache in single invocation
> context, without any means to flush it. I think that we need add
> InvocationContext.current().forget(key) API (throwing exception if the
> entry was modified) or something like that, even for the regular
> streams. Maybe an override for filter methods, too, because you want to
> pass a nice predicate, but you can't just forget all filtered out entries.
>
>>
>>        I agree that "locking" should not be exposed with optimistic
>>        transactions.
>>
>>
>>    Yeah I can't find a good way to do this really and it seems to be
>>    opposite of what optimistic transactions are.
>>
>>
>>        With pessimistic transactions, how do you expect to handle locking
>>        order? For regular operations, user is responsible for setting
>>        up some
>>        locking order in order to not get a deadlock. With pessimistic
>>        transaction, it's the cache itself who will order the calls.
>>        Also, if
>>        you lock anything that is read, you just end up locking
>>        everything (or,
>>        getting a deadlock). If you don't it's the same as issuing the
>>        lock and
>>        reading again (to check the locked value) - but you'd do that
>>        internally
>>        anyway. Therefore, I don't feel well about pessimistic
>>        transactions neither.
>>
>>
>>    The lock is done per key only for each invocation. There is no
>>    ordering as only one is obtained at a time before it goes to the
>>    next. If the user then acquires a lock for another key while in
>>    the Consumer this could cause a deadlock if the inverse occurs on
>>    a different thread/node, but this is on the user. It is the same
>>    as it is today really, except we do the read lock for them before
>>    invoking their Consumer.
>>
>
> In pessimistic mode, you should not release a lock before the end of the
> transaction.
>
>>
>>>
>>> Another question is what does the API for this look like. I was
>>> debating between 3 options myself:
>>>
>>> 1. AdvancedCache.forEachWithLock(BiConsumer<Cache,
>>        CacheEntry<K, V>>
>>> consumer)
>>>
>>> This require the least amount of changes, however the user can't
>>> customize certain parameters that CacheStream currently provides
>>> (listed below - big one being filterKeys).
>>>
>>> 2. CacheStream.forEachWithLock(BiConsumer<Cache,
>>        CacheEntry<K, V>>
>>> consumer)
>>>
>>> This method would only be allowed to be invoked on the
>>        Stream if no
>>> other intermediate operations were invoked, otherwise an
>>        exception
>>> would be thrown. This still gives us access to all of the
>>        CacheStream
>>> methods that aren't on the Stream interface (ie.
>>> sequentialDistribution, parallelDistribution, parallel,
>>        sequential,
>>> filterKeys, filterKeySegments, distributedBatchSize,
>>> disableRehashAware, timeout).
>>
>>        For both options, I don't like Cache being passed around. You
>>        should
>>        modify the CacheEntry (or some kind of view) directly.
>>
>>
>>    I don't know for sure if that is sufficient for the user.
>>    Sometimes they may modify another Cache given the value in this
>>    one for example, which they could access from the CacheManager of
>>    that Cache. Maybe Tristan knows more about some use cases.
>>
>
> Rather than guessing what could the user need, the Consumer could be CDI
> enabled.
>
>>
>>        Radim
>>
>>>
>>> 3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream()
>>>
>>> This requires the most changes, however the API would be the
>>        most
>>> explicit. In this case the LockedStream would only have the
>>        methods on
>>> it that are able to be invoked as noted above and forEach.
>>>
>>> I personally feel that #3 might be the cleanest, but obviously
>>> requires adding more classes. Let me know what you guys
>>        think and if
>>> you think the optimistic exclusion is acceptable.
>>>
>>> Thanks,
>>>
>>> - Will
>>>
>>>
>>> _______________________________________________
>>> infinispan-dev mailing list
>>> [hidden email]
>>        <mailto:[hidden email]>
>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>
>>
>>        --
>>        Radim Vansa <[hidden email] <mailto:[hidden email]>>
>>        JBoss Performance Team
>>
>>        _______________________________________________
>>        infinispan-dev mailing list
>>        [hidden email]
>>        <mailto:[hidden email]>
>>        https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>
>>
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> [hidden email]
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
>
> --
> Radim Vansa <[hidden email]>
> JBoss Performance Team
>
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev


_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|

Re: [infinispan-dev] Stream operations under lock

Galder Zamarreño
In reply to this post by William Burns-3

--
Galder Zamarreño
Infinispan, Red Hat

On 21 Mar 2017, at 18:50, William Burns <[hidden email]> wrote:



On Tue, Mar 21, 2017 at 1:42 PM William Burns <[hidden email]> wrote:
On Tue, Mar 21, 2017 at 12:53 PM Radim Vansa <[hidden email]> wrote:
On 03/21/2017 04:37 PM, William Burns wrote:
Some users have expressed the need to have some sort of forEach
operation that is performed where the Consumer is called while holding
the lock for the given key and subsequently released after the
Consumer operation completes.

Seconding Dan's question - is that intended to be able to modify the
entry? In my opinion, sending a function that will work on the
ReadWriteEntryView directly to the node is the only reasonable request.
I wouldn't like to see blocking operations in there.

Hrmm the user can use the FunctionalMap interface for this then it seems? I wonder if this should just be the going in API. I will need to discuss with Galder the semantics of the evalAll/evalMany methods.

Actually looking at evalAll it seems it doesn't scale as it keeps all entries in memory at once, so this is only for caches with a limited amount of entries.

^ I might be wrong but didn't forEach work this way? I probably looked at that when trying to implement evalAll





Due to the nature of how streams work with retries and performing the
operation on the primary owner, this works out quite well with forEach
to be done in an efficient way.

The problem is that this only really works well with non tx and
pessimistic tx. This obviously leaves out optimistic tx, which at
first I was a little worried about. But after thinking about it more,
this prelocking and optimistic tx don't really fit that well together
anyways. So I am thinking whenever this operation is performed it
would throw an exception not letting the user use this feature in
optimistic transactions.

How exactly reading streams interacts with transactions? Does it wrap
read entries into context? This would be a scalability issue.

It doesn't wrap read entries into the context for that exact reason. It does however use existing entries in the context to override ones in memory/store.


I agree that "locking" should not be exposed with optimistic transactions.

Yeah I can't find a good way to do this really and it seems to be opposite of what optimistic transactions are.


With pessimistic transactions, how do you expect to handle locking
order? For regular operations, user is responsible for setting up some
locking order in order to not get a deadlock. With pessimistic
transaction, it's the cache itself who will order the calls. Also, if
you lock anything that is read, you just end up locking everything (or,
getting a deadlock). If you don't it's the same as issuing the lock and
reading again (to check the locked value) - but you'd do that internally
anyway. Therefore, I don't feel well about pessimistic transactions neither.

The lock is done per key only for each invocation. There is no ordering as only one is obtained at a time before it goes to the next. If the user then acquires a lock for another key while in the Consumer this could cause a deadlock if the inverse occurs on a different thread/node, but this is on the user. It is the same as it is today really, except we do the read lock for them before invoking their Consumer.



Another question is what does the API for this look like. I was
debating between 3 options myself:

1. AdvancedCache.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
consumer)

This require the least amount of changes, however the user can't
customize certain parameters that CacheStream currently provides
(listed below - big one being filterKeys).

2. CacheStream.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
consumer)

This method would only be allowed to be invoked on the Stream if no
other intermediate operations were invoked, otherwise an exception
would be thrown. This still gives us access to all of the CacheStream
methods that aren't on the Stream interface (ie.
sequentialDistribution, parallelDistribution, parallel, sequential,
filterKeys, filterKeySegments, distributedBatchSize,
disableRehashAware, timeout).

For both options, I don't like Cache being passed around. You should
modify the CacheEntry (or some kind of view) directly.

I don't know for sure if that is sufficient for the user. Sometimes they may modify another Cache given the value in this one for example, which they could access from the CacheManager of that Cache. Maybe Tristan knows more about some use cases.


Radim


3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream()

This requires the most changes, however the API would be the most
explicit. In this case the LockedStream would only have the methods on
it that are able to be invoked as noted above and forEach.

I personally feel that #3 might be the cleanest, but obviously
requires adding more classes. Let me know what you guys think and if
you think the optimistic exclusion is acceptable.

Thanks,

- Will


_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev


--
Radim Vansa <[hidden email]>
JBoss Performance Team

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev


_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|

Re: [infinispan-dev] Stream operations under lock

William Burns-3


On Tue, Mar 28, 2017 at 9:27 AM Galder Zamarreño <[hidden email]> wrote:

--
Galder Zamarreño
Infinispan, Red Hat

On 21 Mar 2017, at 18:50, William Burns <[hidden email]> wrote:



On Tue, Mar 21, 2017 at 1:42 PM William Burns <[hidden email]> wrote:
On Tue, Mar 21, 2017 at 12:53 PM Radim Vansa <[hidden email]> wrote:
On 03/21/2017 04:37 PM, William Burns wrote:
Some users have expressed the need to have some sort of forEach
operation that is performed where the Consumer is called while holding
the lock for the given key and subsequently released after the
Consumer operation completes.

Seconding Dan's question - is that intended to be able to modify the
entry? In my opinion, sending a function that will work on the
ReadWriteEntryView directly to the node is the only reasonable request.
I wouldn't like to see blocking operations in there.

Hrmm the user can use the FunctionalMap interface for this then it seems? I wonder if this should just be the going in API. I will need to discuss with Galder the semantics of the evalAll/evalMany methods.

Actually looking at evalAll it seems it doesn't scale as it keeps all entries in memory at once, so this is only for caches with a limited amount of entries.

^ I might be wrong but didn't forEach work this way? I probably looked at that when trying to implement evalAll

It actually is very similar to the distributed iterator. Basically it performs distributedBatchSize number of consumer calls then tells the originator it finished those keys sending them back and continues on. This way at most you can have distributedBatchSize * numNodes worth of more than once calls per state transfer. In practice this will be significantly lower though as you would only have a subset and you have to lose complete ownership of said key.

While keys are sent across the wire, values are never returned with forEach. And key references are released when a segment is completed so it should have a subset of keys in memory.
 






Due to the nature of how streams work with retries and performing the
operation on the primary owner, this works out quite well with forEach
to be done in an efficient way.

The problem is that this only really works well with non tx and
pessimistic tx. This obviously leaves out optimistic tx, which at
first I was a little worried about. But after thinking about it more,
this prelocking and optimistic tx don't really fit that well together
anyways. So I am thinking whenever this operation is performed it
would throw an exception not letting the user use this feature in
optimistic transactions.

How exactly reading streams interacts with transactions? Does it wrap
read entries into context? This would be a scalability issue.

It doesn't wrap read entries into the context for that exact reason. It does however use existing entries in the context to override ones in memory/store.


I agree that "locking" should not be exposed with optimistic transactions.

Yeah I can't find a good way to do this really and it seems to be opposite of what optimistic transactions are.


With pessimistic transactions, how do you expect to handle locking
order? For regular operations, user is responsible for setting up some
locking order in order to not get a deadlock. With pessimistic
transaction, it's the cache itself who will order the calls. Also, if
you lock anything that is read, you just end up locking everything (or,
getting a deadlock). If you don't it's the same as issuing the lock and
reading again (to check the locked value) - but you'd do that internally
anyway. Therefore, I don't feel well about pessimistic transactions neither.

The lock is done per key only for each invocation. There is no ordering as only one is obtained at a time before it goes to the next. If the user then acquires a lock for another key while in the Consumer this could cause a deadlock if the inverse occurs on a different thread/node, but this is on the user. It is the same as it is today really, except we do the read lock for them before invoking their Consumer.



Another question is what does the API for this look like. I was
debating between 3 options myself:

1. AdvancedCache.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
consumer)

This require the least amount of changes, however the user can't
customize certain parameters that CacheStream currently provides
(listed below - big one being filterKeys).

2. CacheStream.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
consumer)

This method would only be allowed to be invoked on the Stream if no
other intermediate operations were invoked, otherwise an exception
would be thrown. This still gives us access to all of the CacheStream
methods that aren't on the Stream interface (ie.
sequentialDistribution, parallelDistribution, parallel, sequential,
filterKeys, filterKeySegments, distributedBatchSize,
disableRehashAware, timeout).

For both options, I don't like Cache being passed around. You should
modify the CacheEntry (or some kind of view) directly.

I don't know for sure if that is sufficient for the user. Sometimes they may modify another Cache given the value in this one for example, which they could access from the CacheManager of that Cache. Maybe Tristan knows more about some use cases.


Radim


3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream()

This requires the most changes, however the API would be the most
explicit. In this case the LockedStream would only have the methods on
it that are able to be invoked as noted above and forEach.

I personally feel that #3 might be the cleanest, but obviously
requires adding more classes. Let me know what you guys think and if
you think the optimistic exclusion is acceptable.

Thanks,

- Will


_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev


--
Radim Vansa <[hidden email]>
JBoss Performance Team

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|

Re: [infinispan-dev] Stream operations under lock

William Burns-3
In reply to this post by Galder Zamarreño


On Mon, Mar 27, 2017 at 9:02 PM Galder Zamarreño <[hidden email]> wrote:

--
Galder Zamarreño
Infinispan, Red Hat

> On 21 Mar 2017, at 17:16, Dan Berindei <[hidden email]> wrote:
>
> I'm leaning towards option 1.
>
> Are you thinking about also allowing the consumer to modify the entry,
> like JCache's EntryProcessors? For a consumer that can only modify the
> current entry, we could even "emulate" locking in an optimistic cache
> by catching the WriteSkewException and running the consumer again.
>
> I wouldn't allow this to be mixed with other operations in a stream,
> because then you may have to run filters/mappers/sorting while holding
> the lock as well.

^ Would forEach w/ lock still run for all entries in originator? If so, not being able to filter could be a pain. IOW, you'd be forcing all entries to be shipped to a node and user to do its own filtering. Not ideal :\

No the primary owner would run the operation per entry. I was thinking we would have 2 levels of filtering in my proposal above.

We would have the first one which is using filterKeys on the CacheStream method. This requires serializing keys sent to each node (only primary owned keys are sent). While it has the cost of serialization it makes up for by having constant time lookups (no iterating memory/stores) for the keys as it creates a stream using Cache.get to populate it.

The second was to support the filter method on the Stream API which would allow for a Predicate so you don't have to serialize keys. In this case you wouldn't want to include keys in this Predicate as all keys would be serialized to all nodes and then you still have to iterate and check the entire data container/store.

You could actually do both as well. So if you only want a subset of known keys where their values match a Predicate this can be done too.

cache.lockedStream().filterKeys(keys).filter(predicate).forEach();
 


>
> Cheers
> Dan
>
>
> On Tue, Mar 21, 2017 at 5:37 PM, William Burns <[hidden email]> wrote:
>> Some users have expressed the need to have some sort of forEach operation
>> that is performed where the Consumer is called while holding the lock for
>> the given key and subsequently released after the Consumer operation
>> completes.
>>
>> Due to the nature of how streams work with retries and performing the
>> operation on the primary owner, this works out quite well with forEach to be
>> done in an efficient way.
>>
>> The problem is that this only really works well with non tx and pessimistic
>> tx. This obviously leaves out optimistic tx, which at first I was a little
>> worried about. But after thinking about it more, this prelocking and
>> optimistic tx don't really fit that well together anyways. So I am thinking
>> whenever this operation is performed it would throw an exception not letting
>> the user use this feature in optimistic transactions.
>>
>> Another question is what does the API for this look like. I was debating
>> between 3 options myself:
>>
>> 1. AdvancedCache.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
>> consumer)
>>
>> This require the least amount of changes, however the user can't customize
>> certain parameters that CacheStream currently provides (listed below - big
>> one being filterKeys).
>>
>> 2. CacheStream.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>> consumer)
>>
>> This method would only be allowed to be invoked on the Stream if no other
>> intermediate operations were invoked, otherwise an exception would be
>> thrown. This still gives us access to all of the CacheStream methods that
>> aren't on the Stream interface (ie. sequentialDistribution,
>> parallelDistribution, parallel, sequential, filterKeys, filterKeySegments,
>> distributedBatchSize, disableRehashAware, timeout).
>>
>> 3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream()
>>
>> This requires the most changes, however the API would be the most explicit.
>> In this case the LockedStream would only have the methods on it that are
>> able to be invoked as noted above and forEach.
>>
>> I personally feel that #3 might be the cleanest, but obviously requires
>> adding more classes. Let me know what you guys think and if you think the
>> optimistic exclusion is acceptable.
>>
>> Thanks,
>>
>> - Will
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> [hidden email]
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev


_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|

Re: [infinispan-dev] Stream operations under lock

Sanne Grinovero-3
Hi Will,

I'm confused about the premise; when you state

" the Consumer is called while holding the lock for the given key and
subsequently released after the Consumer operation completes."

What are the transaction boundaries?
I see two options, please correct me so I understand:

A)

transaction.begin();
cache.lockedStream().filterKeys(keys).filter(predicate).forEach(
someOperation );
transaction.commit(); <-- release all locks

B)

cache.lockedStream().filterKeys(keys).filter(predicate).forEach(
    transaction.begin();
    //to stuff on entry
    /transaction.commit(); <-- release this single entry's lock
);


I think it's important to clarify this as I suspect that #A is not
implementable within reasonable guarantees, while in the case of B# I
see no use for optimistic locking *from a user's perspective*.

Also: what kind of guarantees do you expect about having the operation
being applied on some entry which is strictly *still* a subset of the
keys as defined by the filter predicates?
I take it you'd want to acquire the locks during the filtering process?

That would require the move the transaction boundary to the scenario
A# which seems undesirable.
Technically if I were to need something like this I guess I'd expect
to have a user experience akin to B# but have Infinispan essentially
use optimistic locking (and auto-skip) on entries which are mutated
and fall out of the filter predicate during the lock attempt.

Essentially I suspect that we'd not want to implement different
versions of this depending on the transaction mode, but figure out the
use case and implement a one and only transaction mode which suites
such use cases. So for example we'd simply not offer a mode which
requires to copy the whole grid into the current TX context.

Thanks,
Sanne



On 28 March 2017 at 14:49, William Burns <[hidden email]> wrote:

>
>
> On Mon, Mar 27, 2017 at 9:02 PM Galder Zamarreño <[hidden email]> wrote:
>>
>>
>> --
>> Galder Zamarreño
>> Infinispan, Red Hat
>>
>> > On 21 Mar 2017, at 17:16, Dan Berindei <[hidden email]> wrote:
>> >
>> > I'm leaning towards option 1.
>> >
>> > Are you thinking about also allowing the consumer to modify the entry,
>> > like JCache's EntryProcessors? For a consumer that can only modify the
>> > current entry, we could even "emulate" locking in an optimistic cache
>> > by catching the WriteSkewException and running the consumer again.
>> >
>> > I wouldn't allow this to be mixed with other operations in a stream,
>> > because then you may have to run filters/mappers/sorting while holding
>> > the lock as well.
>>
>> ^ Would forEach w/ lock still run for all entries in originator? If so,
>> not being able to filter could be a pain. IOW, you'd be forcing all entries
>> to be shipped to a node and user to do its own filtering. Not ideal :\
>
>
> No the primary owner would run the operation per entry. I was thinking we
> would have 2 levels of filtering in my proposal above.
>
> We would have the first one which is using filterKeys on the CacheStream
> method. This requires serializing keys sent to each node (only primary owned
> keys are sent). While it has the cost of serialization it makes up for by
> having constant time lookups (no iterating memory/stores) for the keys as it
> creates a stream using Cache.get to populate it.
>
> The second was to support the filter method on the Stream API which would
> allow for a Predicate so you don't have to serialize keys. In this case you
> wouldn't want to include keys in this Predicate as all keys would be
> serialized to all nodes and then you still have to iterate and check the
> entire data container/store.
>
> You could actually do both as well. So if you only want a subset of known
> keys where their values match a Predicate this can be done too.
>
> cache.lockedStream().filterKeys(keys).filter(predicate).forEach();
>
>>
>>
>>
>> >
>> > Cheers
>> > Dan
>> >
>> >
>> > On Tue, Mar 21, 2017 at 5:37 PM, William Burns <[hidden email]>
>> > wrote:
>> >> Some users have expressed the need to have some sort of forEach
>> >> operation
>> >> that is performed where the Consumer is called while holding the lock
>> >> for
>> >> the given key and subsequently released after the Consumer operation
>> >> completes.
>> >>
>> >> Due to the nature of how streams work with retries and performing the
>> >> operation on the primary owner, this works out quite well with forEach
>> >> to be
>> >> done in an efficient way.
>> >>
>> >> The problem is that this only really works well with non tx and
>> >> pessimistic
>> >> tx. This obviously leaves out optimistic tx, which at first I was a
>> >> little
>> >> worried about. But after thinking about it more, this prelocking and
>> >> optimistic tx don't really fit that well together anyways. So I am
>> >> thinking
>> >> whenever this operation is performed it would throw an exception not
>> >> letting
>> >> the user use this feature in optimistic transactions.
>> >>
>> >> Another question is what does the API for this look like. I was
>> >> debating
>> >> between 3 options myself:
>> >>
>> >> 1. AdvancedCache.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
>> >> consumer)
>> >>
>> >> This require the least amount of changes, however the user can't
>> >> customize
>> >> certain parameters that CacheStream currently provides (listed below -
>> >> big
>> >> one being filterKeys).
>> >>
>> >> 2. CacheStream.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
>> >> consumer)
>> >>
>> >> This method would only be allowed to be invoked on the Stream if no
>> >> other
>> >> intermediate operations were invoked, otherwise an exception would be
>> >> thrown. This still gives us access to all of the CacheStream methods
>> >> that
>> >> aren't on the Stream interface (ie. sequentialDistribution,
>> >> parallelDistribution, parallel, sequential, filterKeys,
>> >> filterKeySegments,
>> >> distributedBatchSize, disableRehashAware, timeout).
>> >>
>> >> 3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream()
>> >>
>> >> This requires the most changes, however the API would be the most
>> >> explicit.
>> >> In this case the LockedStream would only have the methods on it that
>> >> are
>> >> able to be invoked as noted above and forEach.
>> >>
>> >> I personally feel that #3 might be the cleanest, but obviously requires
>> >> adding more classes. Let me know what you guys think and if you think
>> >> the
>> >> optimistic exclusion is acceptable.
>> >>
>> >> Thanks,
>> >>
>> >> - Will
>> >>
>> >> _______________________________________________
>> >> infinispan-dev mailing list
>> >> [hidden email]
>> >> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> > _______________________________________________
>> > infinispan-dev mailing list
>> > [hidden email]
>> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> [hidden email]
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
>
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|

Re: [infinispan-dev] Stream operations under lock

William Burns-3


On Tue, Mar 28, 2017 at 11:24 AM Sanne Grinovero <[hidden email]> wrote:
Hi Will,

I'm confused about the premise; when you state

" the Consumer is called while holding the lock for the given key and
subsequently released after the Consumer operation completes."

What are the transaction boundaries?
I see two options, please correct me so I understand:

A)

transaction.begin();
cache.lockedStream().filterKeys(keys).filter(predicate).forEach(
someOperation );
transaction.commit(); <-- release all locks

B)

cache.lockedStream().filterKeys(keys).filter(predicate).forEach(
    transaction.begin();
    //to stuff on entry
    /transaction.commit(); <-- release this single entry's lock
);

The user code doesn't really affect this, it is done internally by Infinispan. We would acquire the lock on the user's behalf in the stream operation then call the user's accept method. Then after they return we would unlock the lock. The way I have it implemented at the moment in my PoC (which could change) is I don't even start a transaction. It just locks the key and then if the user were to invoke an operation that requires a transaction it adds the lock owner to their tx context at that point so they have ownership of the key.

Also to note that a commit or rollback in the Consumer currently doesn't release the lock on the key. Although this could be discussed to be possibly changed.
 


I think it's important to clarify this as I suspect that #A is not
implementable within reasonable guarantees, while in the case of B# I
see no use for optimistic locking *from a user's perspective*.

Exactly my thoughts regarding optimistic. I don't think #A is even feasible given constraints of having a distributed transaction like this.
 

Also: what kind of guarantees do you expect about having the operation
being applied on some entry which is strictly *still* a subset of the
keys as defined by the filter predicates?
I take it you'd want to acquire the locks during the filtering process?

That is a good question. I hadn't thought about the details but what I had implemented was we have to first read the entry, lock the key, reread the entry (in case of concurrent update) and then finally call their Predicate. Another reason the filterKeys is much more efficient :) Note this read, locking and reread is done even without a Predicate supplied. And actually I can also optimize filterKeys to not do the initial read since we already have the key.
 

That would require the move the transaction boundary to the scenario
A# which seems undesirable.
Technically if I were to need something like this I guess I'd expect
to have a user experience akin to B# but have Infinispan essentially
use optimistic locking (and auto-skip) on entries which are mutated
and fall out of the filter predicate during the lock attempt.

Essentially I suspect that we'd not want to implement different
versions of this depending on the transaction mode, but figure out the
use case and implement a one and only transaction mode which suites
such use cases. So for example we'd simply not offer a mode which
requires to copy the whole grid into the current TX context.

This was never the intent and in my follow up emails I came to what seems like the same conclusion that basically this can't be done with the user controlling the transaction and it doesn't really make sense in an optimistic transaction (since you are already at that node, you are already doing everything pessimistically).
 

Thanks,
Sanne



On 28 March 2017 at 14:49, William Burns <[hidden email]> wrote:
>
>
> On Mon, Mar 27, 2017 at 9:02 PM Galder Zamarreño <[hidden email]> wrote:
>>
>>
>> --
>> Galder Zamarreño
>> Infinispan, Red Hat
>>
>> > On 21 Mar 2017, at 17:16, Dan Berindei <[hidden email]> wrote:
>> >
>> > I'm leaning towards option 1.
>> >
>> > Are you thinking about also allowing the consumer to modify the entry,
>> > like JCache's EntryProcessors? For a consumer that can only modify the
>> > current entry, we could even "emulate" locking in an optimistic cache
>> > by catching the WriteSkewException and running the consumer again.
>> >
>> > I wouldn't allow this to be mixed with other operations in a stream,
>> > because then you may have to run filters/mappers/sorting while holding
>> > the lock as well.
>>
>> ^ Would forEach w/ lock still run for all entries in originator? If so,
>> not being able to filter could be a pain. IOW, you'd be forcing all entries
>> to be shipped to a node and user to do its own filtering. Not ideal :\
>
>
> No the primary owner would run the operation per entry. I was thinking we
> would have 2 levels of filtering in my proposal above.
>
> We would have the first one which is using filterKeys on the CacheStream
> method. This requires serializing keys sent to each node (only primary owned
> keys are sent). While it has the cost of serialization it makes up for by
> having constant time lookups (no iterating memory/stores) for the keys as it
> creates a stream using Cache.get to populate it.
>
> The second was to support the filter method on the Stream API which would
> allow for a Predicate so you don't have to serialize keys. In this case you
> wouldn't want to include keys in this Predicate as all keys would be
> serialized to all nodes and then you still have to iterate and check the
> entire data container/store.
>
> You could actually do both as well. So if you only want a subset of known
> keys where their values match a Predicate this can be done too.
>
> cache.lockedStream().filterKeys(keys).filter(predicate).forEach();
>
>>
>>
>>
>> >
>> > Cheers
>> > Dan
>> >
>> >
>> > On Tue, Mar 21, 2017 at 5:37 PM, William Burns <[hidden email]>
>> > wrote:
>> >> Some users have expressed the need to have some sort of forEach
>> >> operation
>> >> that is performed where the Consumer is called while holding the lock
>> >> for
>> >> the given key and subsequently released after the Consumer operation
>> >> completes.
>> >>
>> >> Due to the nature of how streams work with retries and performing the
>> >> operation on the primary owner, this works out quite well with forEach
>> >> to be
>> >> done in an efficient way.
>> >>
>> >> The problem is that this only really works well with non tx and
>> >> pessimistic
>> >> tx. This obviously leaves out optimistic tx, which at first I was a
>> >> little
>> >> worried about. But after thinking about it more, this prelocking and
>> >> optimistic tx don't really fit that well together anyways. So I am
>> >> thinking
>> >> whenever this operation is performed it would throw an exception not
>> >> letting
>> >> the user use this feature in optimistic transactions.
>> >>
>> >> Another question is what does the API for this look like. I was
>> >> debating
>> >> between 3 options myself:
>> >>
>> >> 1. AdvancedCache.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
>> >> consumer)
>> >>
>> >> This require the least amount of changes, however the user can't
>> >> customize
>> >> certain parameters that CacheStream currently provides (listed below -
>> >> big
>> >> one being filterKeys).
>> >>
>> >> 2. CacheStream.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
>> >> consumer)
>> >>
>> >> This method would only be allowed to be invoked on the Stream if no
>> >> other
>> >> intermediate operations were invoked, otherwise an exception would be
>> >> thrown. This still gives us access to all of the CacheStream methods
>> >> that
>> >> aren't on the Stream interface (ie. sequentialDistribution,
>> >> parallelDistribution, parallel, sequential, filterKeys,
>> >> filterKeySegments,
>> >> distributedBatchSize, disableRehashAware, timeout).
>> >>
>> >> 3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream()
>> >>
>> >> This requires the most changes, however the API would be the most
>> >> explicit.
>> >> In this case the LockedStream would only have the methods on it that
>> >> are
>> >> able to be invoked as noted above and forEach.
>> >>
>> >> I personally feel that #3 might be the cleanest, but obviously requires
>> >> adding more classes. Let me know what you guys think and if you think
>> >> the
>> >> optimistic exclusion is acceptable.
>> >>
>> >> Thanks,
>> >>
>> >> - Will
>> >>
>> >> _______________________________________________
>> >> infinispan-dev mailing list
>> >> [hidden email]
>> >> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> > _______________________________________________
>> > infinispan-dev mailing list
>> > [hidden email]
>> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> [hidden email]
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
>
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|

Re: [infinispan-dev] Stream operations under lock

Dan Berindei
On Tue, Mar 28, 2017 at 6:44 PM, William Burns <[hidden email]> wrote:

>
>
> On Tue, Mar 28, 2017 at 11:24 AM Sanne Grinovero <[hidden email]>
> wrote:
>>
>> Hi Will,
>>
>> I'm confused about the premise; when you state
>>
>> " the Consumer is called while holding the lock for the given key and
>> subsequently released after the Consumer operation completes."
>>
>> What are the transaction boundaries?
>> I see two options, please correct me so I understand:
>>
>> A)
>>
>> transaction.begin();
>> cache.lockedStream().filterKeys(keys).filter(predicate).forEach(
>> someOperation );
>> transaction.commit(); <-- release all locks
>>
>> B)
>>
>> cache.lockedStream().filterKeys(keys).filter(predicate).forEach(
>>     transaction.begin();
>>     //to stuff on entry
>>     /transaction.commit(); <-- release this single entry's lock
>> );
>
>
> The user code doesn't really affect this, it is done internally by
> Infinispan. We would acquire the lock on the user's behalf in the stream
> operation then call the user's accept method. Then after they return we
> would unlock the lock. The way I have it implemented at the moment in my PoC
> (which could change) is I don't even start a transaction. It just locks the
> key and then if the user were to invoke an operation that requires a
> transaction it adds the lock owner to their tx context at that point so they
> have ownership of the key.
>
> Also to note that a commit or rollback in the Consumer currently doesn't
> release the lock on the key. Although this could be discussed to be possibly
> changed.
>

With a transactional cache I was assuming you manage the transaction
yourself... if the user has to call
transactionManager.begin()/commit()/rollback() anyway, why not use a
regular stream?

>>
>>
>>
>> I think it's important to clarify this as I suspect that #A is not
>> implementable within reasonable guarantees, while in the case of B# I
>> see no use for optimistic locking *from a user's perspective*.
>
>
> Exactly my thoughts regarding optimistic. I don't think #A is even feasible
> given constraints of having a distributed transaction like this.
>

Totally agree that #A can't work with any kind of transaction configuration.

As to optimistic locking, I would like having "feature parity" between
pessimistic and optimistic caches as much as possible, but I agree
locking eagerly and retrying the consumer on WriteSkewException are a
bit too different to fit under the same API.

>>
>>
>> Also: what kind of guarantees do you expect about having the operation
>> being applied on some entry which is strictly *still* a subset of the
>> keys as defined by the filter predicates?
>> I take it you'd want to acquire the locks during the filtering process?
>
>
> That is a good question. I hadn't thought about the details but what I had
> implemented was we have to first read the entry, lock the key, reread the
> entry (in case of concurrent update) and then finally call their Predicate.
> Another reason the filterKeys is much more efficient :) Note this read,
> locking and reread is done even without a Predicate supplied. And actually I
> can also optimize filterKeys to not do the initial read since we already
> have the key.
>

Would this be more efficient than the consumer reading the key with
FORCE_WRITE_LOCK and deciding what to do based on the value?

>>
>>
>> That would require the move the transaction boundary to the scenario
>> A# which seems undesirable.
>> Technically if I were to need something like this I guess I'd expect
>> to have a user experience akin to B# but have Infinispan essentially
>> use optimistic locking (and auto-skip) on entries which are mutated
>> and fall out of the filter predicate during the lock attempt.
>>
>> Essentially I suspect that we'd not want to implement different
>> versions of this depending on the transaction mode, but figure out the
>> use case and implement a one and only transaction mode which suites
>> such use cases. So for example we'd simply not offer a mode which
>> requires to copy the whole grid into the current TX context.
>
>
> This was never the intent and in my follow up emails I came to what seems
> like the same conclusion that basically this can't be done with the user
> controlling the transaction and it doesn't really make sense in an
> optimistic transaction (since you are already at that node, you are already
> doing everything pessimistically).
>

Even local caches can use optimistic locking :)

>>
>>
>> Thanks,
>> Sanne
>>
>>
>>
>> On 28 March 2017 at 14:49, William Burns <[hidden email]> wrote:
>> >
>> >
>> > On Mon, Mar 27, 2017 at 9:02 PM Galder Zamarreño <[hidden email]>
>> > wrote:
>> >>
>> >>
>> >> --
>> >> Galder Zamarreño
>> >> Infinispan, Red Hat
>> >>
>> >> > On 21 Mar 2017, at 17:16, Dan Berindei <[hidden email]>
>> >> > wrote:
>> >> >
>> >> > I'm leaning towards option 1.
>> >> >
>> >> > Are you thinking about also allowing the consumer to modify the
>> >> > entry,
>> >> > like JCache's EntryProcessors? For a consumer that can only modify
>> >> > the
>> >> > current entry, we could even "emulate" locking in an optimistic cache
>> >> > by catching the WriteSkewException and running the consumer again.
>> >> >
>> >> > I wouldn't allow this to be mixed with other operations in a stream,
>> >> > because then you may have to run filters/mappers/sorting while
>> >> > holding
>> >> > the lock as well.
>> >>
>> >> ^ Would forEach w/ lock still run for all entries in originator? If so,
>> >> not being able to filter could be a pain. IOW, you'd be forcing all
>> >> entries
>> >> to be shipped to a node and user to do its own filtering. Not ideal :\
>> >
>> >
>> > No the primary owner would run the operation per entry. I was thinking
>> > we
>> > would have 2 levels of filtering in my proposal above.
>> >
>> > We would have the first one which is using filterKeys on the CacheStream
>> > method. This requires serializing keys sent to each node (only primary
>> > owned
>> > keys are sent). While it has the cost of serialization it makes up for
>> > by
>> > having constant time lookups (no iterating memory/stores) for the keys
>> > as it
>> > creates a stream using Cache.get to populate it.
>> >
>> > The second was to support the filter method on the Stream API which
>> > would
>> > allow for a Predicate so you don't have to serialize keys. In this case
>> > you
>> > wouldn't want to include keys in this Predicate as all keys would be
>> > serialized to all nodes and then you still have to iterate and check the
>> > entire data container/store.
>> >
>> > You could actually do both as well. So if you only want a subset of
>> > known
>> > keys where their values match a Predicate this can be done too.
>> >
>> > cache.lockedStream().filterKeys(keys).filter(predicate).forEach();
>> >
>> >>
>> >>
>> >>
>> >> >
>> >> > Cheers
>> >> > Dan
>> >> >
>> >> >
>> >> > On Tue, Mar 21, 2017 at 5:37 PM, William Burns <[hidden email]>
>> >> > wrote:
>> >> >> Some users have expressed the need to have some sort of forEach
>> >> >> operation
>> >> >> that is performed where the Consumer is called while holding the
>> >> >> lock
>> >> >> for
>> >> >> the given key and subsequently released after the Consumer operation
>> >> >> completes.
>> >> >>
>> >> >> Due to the nature of how streams work with retries and performing
>> >> >> the
>> >> >> operation on the primary owner, this works out quite well with
>> >> >> forEach
>> >> >> to be
>> >> >> done in an efficient way.
>> >> >>
>> >> >> The problem is that this only really works well with non tx and
>> >> >> pessimistic
>> >> >> tx. This obviously leaves out optimistic tx, which at first I was a
>> >> >> little
>> >> >> worried about. But after thinking about it more, this prelocking and
>> >> >> optimistic tx don't really fit that well together anyways. So I am
>> >> >> thinking
>> >> >> whenever this operation is performed it would throw an exception not
>> >> >> letting
>> >> >> the user use this feature in optimistic transactions.
>> >> >>
>> >> >> Another question is what does the API for this look like. I was
>> >> >> debating
>> >> >> between 3 options myself:
>> >> >>
>> >> >> 1. AdvancedCache.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
>> >> >> consumer)
>> >> >>
>> >> >> This require the least amount of changes, however the user can't
>> >> >> customize
>> >> >> certain parameters that CacheStream currently provides (listed below
>> >> >> -
>> >> >> big
>> >> >> one being filterKeys).
>> >> >>
>> >> >> 2. CacheStream.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
>> >> >> consumer)
>> >> >>
>> >> >> This method would only be allowed to be invoked on the Stream if no
>> >> >> other
>> >> >> intermediate operations were invoked, otherwise an exception would
>> >> >> be
>> >> >> thrown. This still gives us access to all of the CacheStream methods
>> >> >> that
>> >> >> aren't on the Stream interface (ie. sequentialDistribution,
>> >> >> parallelDistribution, parallel, sequential, filterKeys,
>> >> >> filterKeySegments,
>> >> >> distributedBatchSize, disableRehashAware, timeout).
>> >> >>
>> >> >> 3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream()
>> >> >>
>> >> >> This requires the most changes, however the API would be the most
>> >> >> explicit.
>> >> >> In this case the LockedStream would only have the methods on it that
>> >> >> are
>> >> >> able to be invoked as noted above and forEach.
>> >> >>
>> >> >> I personally feel that #3 might be the cleanest, but obviously
>> >> >> requires
>> >> >> adding more classes. Let me know what you guys think and if you
>> >> >> think
>> >> >> the
>> >> >> optimistic exclusion is acceptable.
>> >> >>
>> >> >> Thanks,
>> >> >>
>> >> >> - Will
>> >> >>
>> >> >> _______________________________________________
>> >> >> infinispan-dev mailing list
>> >> >> [hidden email]
>> >> >> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> >> > _______________________________________________
>> >> > infinispan-dev mailing list
>> >> > [hidden email]
>> >> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> >>
>> >>
>> >> _______________________________________________
>> >> infinispan-dev mailing list
>> >> [hidden email]
>> >> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> >
>> >
>> > _______________________________________________
>> > infinispan-dev mailing list
>> > [hidden email]
>> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> [hidden email]
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
>
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|

Re: [infinispan-dev] Stream operations under lock

William Burns-3


On Tue, Mar 28, 2017 at 12:52 PM Dan Berindei <[hidden email]> wrote:
On Tue, Mar 28, 2017 at 6:44 PM, William Burns <[hidden email]> wrote:
>
>
> On Tue, Mar 28, 2017 at 11:24 AM Sanne Grinovero <[hidden email]>
> wrote:
>>
>> Hi Will,
>>
>> I'm confused about the premise; when you state
>>
>> " the Consumer is called while holding the lock for the given key and
>> subsequently released after the Consumer operation completes."
>>
>> What are the transaction boundaries?
>> I see two options, please correct me so I understand:
>>
>> A)
>>
>> transaction.begin();
>> cache.lockedStream().filterKeys(keys).filter(predicate).forEach(
>> someOperation );
>> transaction.commit(); <-- release all locks
>>
>> B)
>>
>> cache.lockedStream().filterKeys(keys).filter(predicate).forEach(
>>     transaction.begin();
>>     //to stuff on entry
>>     /transaction.commit(); <-- release this single entry's lock
>> );
>
>
> The user code doesn't really affect this, it is done internally by
> Infinispan. We would acquire the lock on the user's behalf in the stream
> operation then call the user's accept method. Then after they return we
> would unlock the lock. The way I have it implemented at the moment in my PoC
> (which could change) is I don't even start a transaction. It just locks the
> key and then if the user were to invoke an operation that requires a
> transaction it adds the lock owner to their tx context at that point so they
> have ownership of the key.
>
> Also to note that a commit or rollback in the Consumer currently doesn't
> release the lock on the key. Although this could be discussed to be possibly
> changed.
>

With a transactional cache I was assuming you manage the transaction
yourself... if the user has to call
transactionManager.begin()/commit()/rollback() anyway, why not use a
regular stream?

The user would be using an implicit transaction or explicit. Like I said this is up for discussion. The main reason I am staying away from managing the transaction is that the user can mess with the transaction as well which would possibly release the lock. It is much simpler if all I am doing is managing the lock. And if the user doesn't require a transaction in Infinispan we didn't waste time starting one and releasing one.
 

>>
>>
>>
>> I think it's important to clarify this as I suspect that #A is not
>> implementable within reasonable guarantees, while in the case of B# I
>> see no use for optimistic locking *from a user's perspective*.
>
>
> Exactly my thoughts regarding optimistic. I don't think #A is even feasible
> given constraints of having a distributed transaction like this.
>

Totally agree that #A can't work with any kind of transaction configuration.

As to optimistic locking, I would like having "feature parity" between
pessimistic and optimistic caches as much as possible, but I agree
locking eagerly and retrying the consumer on WriteSkewException are a
bit too different to fit under the same API.

Yeah I would definitely like to have it as well, but I just can't see how it fits in. This is despite the implementation detail that it is quite difficult to get it working currently :D
 

>>
>>
>> Also: what kind of guarantees do you expect about having the operation
>> being applied on some entry which is strictly *still* a subset of the
>> keys as defined by the filter predicates?
>> I take it you'd want to acquire the locks during the filtering process?
>
>
> That is a good question. I hadn't thought about the details but what I had
> implemented was we have to first read the entry, lock the key, reread the
> entry (in case of concurrent update) and then finally call their Predicate.
> Another reason the filterKeys is much more efficient :) Note this read,
> locking and reread is done even without a Predicate supplied. And actually I
> can also optimize filterKeys to not do the initial read since we already
> have the key.
>

Would this be more efficient than the consumer reading the key with
FORCE_WRITE_LOCK and deciding what to do based on the value?


The problem with this is you have to go remote to lock the key, return the value then do something for every key (not to mention pulling those keys using an iterator). Very costly! The main benefit of the stream with lock is that you are performing everything on the primary owner of the data with the lock already acquired. The only piece sent remotely is the consumer and some internal classes, very light weight and you have all the benefits of data locality.
 
>>
>>
>> That would require the move the transaction boundary to the scenario
>> A# which seems undesirable.
>> Technically if I were to need something like this I guess I'd expect
>> to have a user experience akin to B# but have Infinispan essentially
>> use optimistic locking (and auto-skip) on entries which are mutated
>> and fall out of the filter predicate during the lock attempt.
>>
>> Essentially I suspect that we'd not want to implement different
>> versions of this depending on the transaction mode, but figure out the
>> use case and implement a one and only transaction mode which suites
>> such use cases. So for example we'd simply not offer a mode which
>> requires to copy the whole grid into the current TX context.
>
>
> This was never the intent and in my follow up emails I came to what seems
> like the same conclusion that basically this can't be done with the user
> controlling the transaction and it doesn't really make sense in an
> optimistic transaction (since you are already at that node, you are already
> doing everything pessimistically).
>

Even local caches can use optimistic locking :)

Yes I know :) I was just framing it in the notion of remote. If anyone can think of a nice way of using this with optimistic transactions I would all be for it. But optimistic transactions just doesn't make any sense to me when you are locking a key eagerly for someone to do something with it (definition of pessimistic transaction).
 

>>
>>
>> Thanks,
>> Sanne
>>
>>
>>
>> On 28 March 2017 at 14:49, William Burns <[hidden email]> wrote:
>> >
>> >
>> > On Mon, Mar 27, 2017 at 9:02 PM Galder Zamarreño <[hidden email]>
>> > wrote:
>> >>
>> >>
>> >> --
>> >> Galder Zamarreño
>> >> Infinispan, Red Hat
>> >>
>> >> > On 21 Mar 2017, at 17:16, Dan Berindei <[hidden email]>
>> >> > wrote:
>> >> >
>> >> > I'm leaning towards option 1.
>> >> >
>> >> > Are you thinking about also allowing the consumer to modify the
>> >> > entry,
>> >> > like JCache's EntryProcessors? For a consumer that can only modify
>> >> > the
>> >> > current entry, we could even "emulate" locking in an optimistic cache
>> >> > by catching the WriteSkewException and running the consumer again.
>> >> >
>> >> > I wouldn't allow this to be mixed with other operations in a stream,
>> >> > because then you may have to run filters/mappers/sorting while
>> >> > holding
>> >> > the lock as well.
>> >>
>> >> ^ Would forEach w/ lock still run for all entries in originator? If so,
>> >> not being able to filter could be a pain. IOW, you'd be forcing all
>> >> entries
>> >> to be shipped to a node and user to do its own filtering. Not ideal :\
>> >
>> >
>> > No the primary owner would run the operation per entry. I was thinking
>> > we
>> > would have 2 levels of filtering in my proposal above.
>> >
>> > We would have the first one which is using filterKeys on the CacheStream
>> > method. This requires serializing keys sent to each node (only primary
>> > owned
>> > keys are sent). While it has the cost of serialization it makes up for
>> > by
>> > having constant time lookups (no iterating memory/stores) for the keys
>> > as it
>> > creates a stream using Cache.get to populate it.
>> >
>> > The second was to support the filter method on the Stream API which
>> > would
>> > allow for a Predicate so you don't have to serialize keys. In this case
>> > you
>> > wouldn't want to include keys in this Predicate as all keys would be
>> > serialized to all nodes and then you still have to iterate and check the
>> > entire data container/store.
>> >
>> > You could actually do both as well. So if you only want a subset of
>> > known
>> > keys where their values match a Predicate this can be done too.
>> >
>> > cache.lockedStream().filterKeys(keys).filter(predicate).forEach();
>> >
>> >>
>> >>
>> >>
>> >> >
>> >> > Cheers
>> >> > Dan
>> >> >
>> >> >
>> >> > On Tue, Mar 21, 2017 at 5:37 PM, William Burns <[hidden email]>
>> >> > wrote:
>> >> >> Some users have expressed the need to have some sort of forEach
>> >> >> operation
>> >> >> that is performed where the Consumer is called while holding the
>> >> >> lock
>> >> >> for
>> >> >> the given key and subsequently released after the Consumer operation
>> >> >> completes.
>> >> >>
>> >> >> Due to the nature of how streams work with retries and performing
>> >> >> the
>> >> >> operation on the primary owner, this works out quite well with
>> >> >> forEach
>> >> >> to be
>> >> >> done in an efficient way.
>> >> >>
>> >> >> The problem is that this only really works well with non tx and
>> >> >> pessimistic
>> >> >> tx. This obviously leaves out optimistic tx, which at first I was a
>> >> >> little
>> >> >> worried about. But after thinking about it more, this prelocking and
>> >> >> optimistic tx don't really fit that well together anyways. So I am
>> >> >> thinking
>> >> >> whenever this operation is performed it would throw an exception not
>> >> >> letting
>> >> >> the user use this feature in optimistic transactions.
>> >> >>
>> >> >> Another question is what does the API for this look like. I was
>> >> >> debating
>> >> >> between 3 options myself:
>> >> >>
>> >> >> 1. AdvancedCache.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
>> >> >> consumer)
>> >> >>
>> >> >> This require the least amount of changes, however the user can't
>> >> >> customize
>> >> >> certain parameters that CacheStream currently provides (listed below
>> >> >> -
>> >> >> big
>> >> >> one being filterKeys).
>> >> >>
>> >> >> 2. CacheStream.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
>> >> >> consumer)
>> >> >>
>> >> >> This method would only be allowed to be invoked on the Stream if no
>> >> >> other
>> >> >> intermediate operations were invoked, otherwise an exception would
>> >> >> be
>> >> >> thrown. This still gives us access to all of the CacheStream methods
>> >> >> that
>> >> >> aren't on the Stream interface (ie. sequentialDistribution,
>> >> >> parallelDistribution, parallel, sequential, filterKeys,
>> >> >> filterKeySegments,
>> >> >> distributedBatchSize, disableRehashAware, timeout).
>> >> >>
>> >> >> 3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream()
>> >> >>
>> >> >> This requires the most changes, however the API would be the most
>> >> >> explicit.
>> >> >> In this case the LockedStream would only have the methods on it that
>> >> >> are
>> >> >> able to be invoked as noted above and forEach.
>> >> >>
>> >> >> I personally feel that #3 might be the cleanest, but obviously
>> >> >> requires
>> >> >> adding more classes. Let me know what you guys think and if you
>> >> >> think
>> >> >> the
>> >> >> optimistic exclusion is acceptable.
>> >> >>
>> >> >> Thanks,
>> >> >>
>> >> >> - Will
>> >> >>
>> >> >> _______________________________________________
>> >> >> infinispan-dev mailing list
>> >> >> [hidden email]
>> >> >> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> >> > _______________________________________________
>> >> > infinispan-dev mailing list
>> >> > [hidden email]
>> >> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> >>
>> >>
>> >> _______________________________________________
>> >> infinispan-dev mailing list
>> >> [hidden email]
>> >> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> >
>> >
>> > _______________________________________________
>> > infinispan-dev mailing list
>> > [hidden email]
>> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> [hidden email]
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
>
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
12