[infinispan-dev] Distributed execution framework - update

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

[infinispan-dev] Distributed execution framework - update

Vladimir Blagojevic
I want to roughly outline what I believe needs to be done to implement
basic distributed
execution framework for 5.0 Final.

If you recall, Distributed/MapReduce task is a logical work unit
consisting of multiple distributed
executables executed individually across Infinispan cluster. Each
individual task execution
on Infinispan cluster is governed by failover, load balancing and
execution policies.

Failover policy

Failover policy regulates how and if distributed task executables are
migrated to
backup execution nodes in case of failure.

Executable can fail due to:

- exception raised in task implementation during execution
- node crash/leave
- migration failure to/from target execution node

Infinispan will invoke failover mechanism in all above cases except when
exception
is raised by task executable itself. Exception will be returned to
invoker of
distributed task who can act upon it.

By default, there will be two failover policies: failover off, and
failover on.
If failover is on load balancing policy in place will decide where to
migrate
task executable for execution. In case failure is off task invoker will
be notified.


Load balancing policy

Load balancing policy decides how distributed task executables are
dispersed for
execution around Infinispan cluster. By default data collocating load
balancing
policy is used as soon as distributed task is invoked on a set of keys
in cache.
Other, simpler, load balancing policies can be implemented as well if a
need arises.



Execution policy

Execution policy decides how task executable is executed once it has
been migrated
to an execution node. By default priority queue is used for queuing of
execution
task executables. Users can, if needed, fine-tune task priority on per
task basis.
If priority is not changed for all tasks then all their executables are
effectively
queued fifo on execution nodes.

Time permitting, job stealing policy should be implemented taking into
account ideas
from fork/join framework and applying it in a distributed fashion
amongst Infinispan
nodes.


Implementation sketches


In order to implement distributed/mapreduce task execution I believe we
should reuse
existing Infinispan infrastructure (marshalling, remote command
invocation, interceptor
chain, thread pools) as much as possible.

As user submits distributed task we would locate Infinispan nodes where
the input keys
are located and send executables (DistributedCallable/Mapper/Reducer) to
those nodes
using exisiting remote command invocation mechanism. Decision about
migration of
executables is effectively done by load balancing policy, the default
one being
collocating policy.

When executables wrapped into commands arrive to Infinispan nodes they
are handed
off to a special handling object (execution policy) rather than
invocation handler.
Execution policy interacts with execution container and in turn queues
and monitors
executables as they are executed in container's thread pool.
DistributedCallable(s) are
invoked and results returned to invoking node. Mappers are invoked as
well and their
results handed off to Reducers as described in mapreduce algorithm.
Eventually a
result of each Reducer is also returned to task invoker and in turn
Collator is invoked.

In case of task failure due to exception raised in task itself,
exception is returned to
task submitter. In other cases, failover policy along with load
balancing policy decides
how to migrate executable to other Infinispan nodes.


If you think that I omitted something and/or have suggestion let me know.

Regards,
Vladimir




_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|

Re: [infinispan-dev] Distributed execution framework - update

Manik Surtani

On 1 Mar 2011, at 18:07, Vladimir Blagojevic wrote:

> I want to roughly outline what I believe needs to be done to implement
> basic distributed
> execution framework for 5.0 Final.
>
> If you recall, Distributed/MapReduce task is a logical work unit
> consisting of multiple distributed
> executables executed individually across Infinispan cluster. Each
> individual task execution
> on Infinispan cluster is governed by failover, load balancing and
> execution policies.
>
> Failover policy
>
> Failover policy regulates how and if distributed task executables are
> migrated to
> backup execution nodes in case of failure.
>
> Executable can fail due to:
>
> - exception raised in task implementation during execution
> - node crash/leave
> - migration failure to/from target execution node
>
> Infinispan will invoke failover mechanism in all above cases except when
> exception
> is raised by task executable itself. Exception will be returned to
> invoker of
> distributed task who can act upon it.
>
> By default, there will be two failover policies: failover off, and
> failover on.
> If failover is on load balancing policy in place will decide where to
> migrate
> task executable for execution. In case failure is off task invoker will
> be notified.

Hmm - tasks may fail for various reasons, including a badly written task which may always fail.  Maybe all you need is a simple retry count?

> Load balancing policy
>
> Load balancing policy decides how distributed task executables are
> dispersed for
> execution around Infinispan cluster. By default data collocating load
> balancing
> policy is used as soon as distributed task is invoked on a set of keys
> in cache.
> Other, simpler, load balancing policies can be implemented as well if a
> need arises.

I don't understand - I thought tasks are executed on all data owners (if any keys are provided) or on all nodes if no keys are provided.  What other "load balancing" policies do you foresee?  :-)

If it doesn't make sense maybe it would be better to remove load balancing from the API for now, add it later if we see a need?  

> Execution policy
>
> Execution policy decides how task executable is executed once it has
> been migrated
> to an execution node. By default priority queue is used for queuing of
> execution
> task executables. Users can, if needed, fine-tune task priority on per
> task basis.
> If priority is not changed for all tasks then all their executables are
> effectively
> queued fifo on execution nodes.
>
> Time permitting, job stealing policy should be implemented taking into
> account ideas
> from fork/join framework and applying it in a distributed fashion
> amongst Infinispan
> nodes.

I would say leave prioritising out for now.  Lets keep this simple.  This is all stuff that can be added later if needed.

> Implementation sketches
>
>
> In order to implement distributed/mapreduce task execution I believe we
> should reuse
> existing Infinispan infrastructure (marshalling, remote command
> invocation, interceptor
> chain, thread pools) as much as possible.
>
> As user submits distributed task we would locate Infinispan nodes where
> the input keys
> are located and send executables (DistributedCallable/Mapper/Reducer) to
> those nodes
> using exisiting remote command invocation mechanism. Decision about
> migration of
> executables is effectively done by load balancing policy, the default
> one being
> collocating policy.
>
> When executables wrapped into commands arrive to Infinispan nodes they
> are handed
> off to a special handling object (execution policy) rather than
> invocation handler.

Why?  If you are creating a special Command for this, the Command's perform() method could handle execution.  Saves you having to re-engineer the InvocationHandler (which is very complex as it is).

> Execution policy interacts with execution container and in turn queues
> and monitors
> executables as they are executed in container's thread pool.
> DistributedCallable(s) are
> invoked and results returned to invoking node. Mappers are invoked as
> well and their
> results handed off to Reducers as described in mapreduce algorithm.
> Eventually a
> result of each Reducer is also returned to task invoker and in turn
> Collator is invoked.
>
> In case of task failure due to exception raised in task itself,
> exception is returned to
> task submitter.

Yes, the Command's perform() method would just have to throw an exception.  The InvocationHandler will wrap it in an appropriate ExceptionResponse to me sent back to the caller.

> In other cases, failover policy along with load
> balancing policy decides
> how to migrate executable to other Infinispan nodes.
>
>
> If you think that I omitted something and/or have suggestion let me know.
>
> Regards,
> Vladimir
>
>
>
>
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev

--
Manik Surtani
[hidden email]
twitter.com/maniksurtani

Lead, Infinispan
http://www.infinispan.org




_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|

Re: [infinispan-dev] Distributed execution framework - update

Vladimir Blagojevic
Agreed on all points. Let's keep it simple and implement defaults and
later on if the need arises we can standardize APIs for components where
interchangeable implementations could be fitted in.

On 11-03-02 5:46 AM, Manik Surtani wrote:

> On 1 Mar 2011, at 18:07, Vladimir Blagojevic wrote:
>
>> I want to roughly outline what I believe needs to be done to implement
>> basic distributed
>> execution framework for 5.0 Final.
>>
>> If you recall, Distributed/MapReduce task is a logical work unit
>> consisting of multiple distributed
>> executables executed individually across Infinispan cluster. Each
>> individual task execution
>> on Infinispan cluster is governed by failover, load balancing and
>> execution policies.
>>
>> Failover policy
>>
>> Failover policy regulates how and if distributed task executables are
>> migrated to
>> backup execution nodes in case of failure.
>>
>> Executable can fail due to:
>>
>> - exception raised in task implementation during execution
>> - node crash/leave
>> - migration failure to/from target execution node
>>
>> Infinispan will invoke failover mechanism in all above cases except when
>> exception
>> is raised by task executable itself. Exception will be returned to
>> invoker of
>> distributed task who can act upon it.
>>
>> By default, there will be two failover policies: failover off, and
>> failover on.
>> If failover is on load balancing policy in place will decide where to
>> migrate
>> task executable for execution. In case failure is off task invoker will
>> be notified.
> Hmm - tasks may fail for various reasons, including a badly written task which may always fail.  Maybe all you need is a simple retry count?
>
>> Load balancing policy
>>
>> Load balancing policy decides how distributed task executables are
>> dispersed for
>> execution around Infinispan cluster. By default data collocating load
>> balancing
>> policy is used as soon as distributed task is invoked on a set of keys
>> in cache.
>> Other, simpler, load balancing policies can be implemented as well if a
>> need arises.
> I don't understand - I thought tasks are executed on all data owners (if any keys are provided) or on all nodes if no keys are provided.  What other "load balancing" policies do you foresee?  :-)
>
> If it doesn't make sense maybe it would be better to remove load balancing from the API for now, add it later if we see a need?
>
>> Execution policy
>>
>> Execution policy decides how task executable is executed once it has
>> been migrated
>> to an execution node. By default priority queue is used for queuing of
>> execution
>> task executables. Users can, if needed, fine-tune task priority on per
>> task basis.
>> If priority is not changed for all tasks then all their executables are
>> effectively
>> queued fifo on execution nodes.
>>
>> Time permitting, job stealing policy should be implemented taking into
>> account ideas
>> from fork/join framework and applying it in a distributed fashion
>> amongst Infinispan
>> nodes.
> I would say leave prioritising out for now.  Lets keep this simple.  This is all stuff that can be added later if needed.
>
>> Implementation sketches
>>
>>
>> In order to implement distributed/mapreduce task execution I believe we
>> should reuse
>> existing Infinispan infrastructure (marshalling, remote command
>> invocation, interceptor
>> chain, thread pools) as much as possible.
>>
>> As user submits distributed task we would locate Infinispan nodes where
>> the input keys
>> are located and send executables (DistributedCallable/Mapper/Reducer) to
>> those nodes
>> using exisiting remote command invocation mechanism. Decision about
>> migration of
>> executables is effectively done by load balancing policy, the default
>> one being
>> collocating policy.
>>
>> When executables wrapped into commands arrive to Infinispan nodes they
>> are handed
>> off to a special handling object (execution policy) rather than
>> invocation handler.
> Why?  If you are creating a special Command for this, the Command's perform() method could handle execution.  Saves you having to re-engineer the InvocationHandler (which is very complex as it is).
>
>> Execution policy interacts with execution container and in turn queues
>> and monitors
>> executables as they are executed in container's thread pool.
>> DistributedCallable(s) are
>> invoked and results returned to invoking node. Mappers are invoked as
>> well and their
>> results handed off to Reducers as described in mapreduce algorithm.
>> Eventually a
>> result of each Reducer is also returned to task invoker and in turn
>> Collator is invoked.
>>
>> In case of task failure due to exception raised in task itself,
>> exception is returned to
>> task submitter.
> Yes, the Command's perform() method would just have to throw an exception.  The InvocationHandler will wrap it in an appropriate ExceptionResponse to me sent back to the caller.
>
>> In other cases, failover policy along with load
>> balancing policy decides
>> how to migrate executable to other Infinispan nodes.
>>
>>
>> If you think that I omitted something and/or have suggestion let me know.
>>
>> Regards,
>> Vladimir
>>
>>
>>
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> [hidden email]
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
> --
> Manik Surtani
> [hidden email]
> twitter.com/maniksurtani
>
> Lead, Infinispan
> http://www.infinispan.org
>
>
>
>
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev