[infinispan-dev] Distributed execution framework - API proposal(s)

classic Classic list List threaded Threaded
38 messages Options
12
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[infinispan-dev] Distributed execution framework - API proposal(s)

Vladimir Blagojevic
Hey,

I spent the last week working on concrete API proposals for distributed
execution framework. I believe that we are close to finalize the
proposal and your input and feedback is important now! Here are the main
ideas where I think we made progress since we last talked.


Access to multiple caches during task execution

While we have agreed to allow access to multiple caches during task
execution including this logic into task API complicates it greatly. The
compromise I found is to focus all API on to a one specific cache but
allow access to other caches through DistributedTaskContext API. The
focus on one specific cache and its input keys will allows us to
properly CH map task units across Infinispan cluster and will cover most
of the use cases. DistributedTaskContext can also easily be mapped to a
single cache. See DistributedTask and DistributedTaskContext for more
details.


DistributedTask and DistributedCallable

I found it useful to separate task characteristics in general and actual
work/computation details. Therefore the main task characteristics are
specified through DistributedTask API and details of actual task
computation are specified through DistributedCallable API.
DistributedTask specifies coarse task details, the failover policy, the
task splitting policy, cancellation policy and so on while in
DistributedCallable API implementers focus on actual details of a
computation/work unit.


I have updated the original document [1] to reflect API update. You can
see the actual proposal in git here [2] and I have also included the
variation of this approach [3] that separates map and reduce task phases
with separate interfaces and removes DistributedCallable interaface. I
have also kept Trustin's ideas in another proposal [4] since I would
like to include them as well if possible.

Regards,
Vladimir


[1] http://community.jboss.org/wiki/InfinispanDistributedExecutionFramework
[2] https://github.com/vblagoje/infinispan/tree/t_ispn-39_master_prop1
[3] https://github.com/vblagoje/infinispan/tree/t_ispn-39_master_prop2
[4] https://github.com/vblagoje/infinispan/tree/t_ispn-39_master_prop3



_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [infinispan-dev] Distributed execution framework - API proposal(s)

Galder Zamarreño

On Dec 27, 2010, at 5:25 PM, Vladimir Blagojevic wrote:

> Hey,
>
> I spent the last week working on concrete API proposals for distributed
> execution framework. I believe that we are close to finalize the
> proposal and your input and feedback is important now! Here are the main
> ideas where I think we made progress since we last talked.
>
>
> Access to multiple caches during task execution
>
> While we have agreed to allow access to multiple caches during task
> execution including this logic into task API complicates it greatly. The
> compromise I found is to focus all API on to a one specific cache but
> allow access to other caches through DistributedTaskContext API. The
> focus on one specific cache and its input keys will allows us to
> properly CH map task units across Infinispan cluster and will cover most
> of the use cases. DistributedTaskContext can also easily be mapped to a
> single cache. See DistributedTask and DistributedTaskContext for more
> details.

Maybe I'm reading this wrong but are you saying that multiple caches cause problem with mapping of task units to nodes in cluster?

Or are you just doing it not to clutter the API?

I think DistributedTaskContext extending CacheContainer is rather confusing, particularly when DistributedTaskContext has K,V parameters that generally are associated with Cache rather than CacheContainer.

Also, why is a context iterable? Iterates the contents of a CacheContainer? extends generally means that "is something". AFAIK, you'd be able to iterate a Map or Cache, but not a CacheContainer.

>
>
> DistributedTask and DistributedCallable
>
> I found it useful to separate task characteristics in general and actual
> work/computation details. Therefore the main task characteristics are
> specified through DistributedTask API and details of actual task
> computation are specified through DistributedCallable API.
> DistributedTask specifies coarse task details, the failover policy, the
> task splitting policy, cancellation policy and so on while in
> DistributedCallable API implementers focus on actual details of a
> computation/work unit.

Personally, I think DistributedTask has too many generics (K, V, T, R) and it's hard to read. IMO, only T and R should only exist. I would also try to stick to Callable conventions that takes a V.

I don't like to see things like this, reminds me of EJB 2.1 where you were forced to implement a method to simply get hold of a ctx. There're much nicer ways to do things like this, if completely necessary (see EJB3) :

      @Override
      public void mapped(DistributedTaskContext<String, String> ctx) {
         this.ctx = ctx;
      }

Looking at the example provided, it seems to me that all DistributedTaskContext is used for is to navigate the Cache contents from a user defined callable, in which case I would limit its scope.

Finally, what is the aim of the write() methods in DTC? I don't see them in use in the given example. If they're not meant to be used by the user and are only for internal fwk use, I would not leak them.

>
>
> I have updated the original document [1] to reflect API update. You can
> see the actual proposal in git here [2] and I have also included the
> variation of this approach [3] that separates map and reduce task phases
> with separate interfaces and removes DistributedCallable interaface. I
> have also kept Trustin's ideas in another proposal [4] since I would
> like to include them as well if possible.
>
> Regards,
> Vladimir
>
>
> [1] http://community.jboss.org/wiki/InfinispanDistributedExecutionFramework
> [2] https://github.com/vblagoje/infinispan/tree/t_ispn-39_master_prop1
> [3] https://github.com/vblagoje/infinispan/tree/t_ispn-39_master_prop2
> [4] https://github.com/vblagoje/infinispan/tree/t_ispn-39_master_prop3
>
>
>
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev

--
Galder Zamarreño
Sr. Software Engineer
Infinispan, JBoss Cache


_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [infinispan-dev] Distributed execution framework - API proposal(s)

Vladimir Blagojevic
On 11-01-03 6:16 AM, Galder Zamarreño wrote:
> Maybe I'm reading this wrong but are you saying that multiple caches cause problem with mapping of task units to nodes in cluster?
>
> Or are you just doing it not to clutter the API?

Clutter of API. If you did not like K,V,T,R imagine dealing with
multiple cache confusion! It would be horrible.

> I think DistributedTaskContext extending CacheContainer is rather confusing, particularly when DistributedTaskContext has K,V parameters that generally are associated with Cache rather than CacheContainer.

Yes, true but DistributedTaskContext is primarily geared towards one
cache while providing opportunity to read data from other caches as
well. Hence K,V for the primary cache. Any suggestions how to deal with
this in a more elegant way? Maybe pass DistributedTaskContext and
CacheContainer as separate parameters?


> Also, why is a context iterable? Iterates the contents of a CacheContainer? extends generally means that "is something". AFAIK, you'd be able to iterate a Map or Cache, but not a CacheContainer.

True.

> Personally, I think DistributedTask has too many generics (K, V, T, R) and it's hard to read. IMO, only T and R should only exist. I would also try to stick to Callable conventions that takes a V.
>
> I don't like to see things like this, reminds me of EJB 2.1 where you were forced to implement a method to simply get hold of a ctx. There're much nicer ways to do things like this, if completely necessary (see EJB3) :

You mean injection? There is a proposal 2 that essentially does this.

>        @Override
>        public void mapped(DistributedTaskContext<String, String>  ctx) {
>           this.ctx = ctx;
>        }
>
> Looking at the example provided, it seems to me that all DistributedTaskContext is used for is to navigate the Cache contents from a user defined callable, in which case I would limit its scope.

What do you mean - "limit its scope"?

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [infinispan-dev] Distributed execution framework - API proposal(s)

Eduardo Martins
Why not?

public interface DistributedTaskContext {
               
   <K, V> Cache<K, V> getCache(String cacheName);
               
   Address getExecutionNode();
               
}

-- Eduardo
..............................................
http://emmartins.blogspot.com
http://redhat.com/solutions/telco

On Mon, Jan 3, 2011 at 11:37 AM, Vladimir Blagojevic
<[hidden email]> wrote:

> On 11-01-03 6:16 AM, Galder Zamarreño wrote:
>> Maybe I'm reading this wrong but are you saying that multiple caches cause problem with mapping of task units to nodes in cluster?
>>
>> Or are you just doing it not to clutter the API?
>
> Clutter of API. If you did not like K,V,T,R imagine dealing with
> multiple cache confusion! It would be horrible.
>
>> I think DistributedTaskContext extending CacheContainer is rather confusing, particularly when DistributedTaskContext has K,V parameters that generally are associated with Cache rather than CacheContainer.
>
> Yes, true but DistributedTaskContext is primarily geared towards one
> cache while providing opportunity to read data from other caches as
> well. Hence K,V for the primary cache. Any suggestions how to deal with
> this in a more elegant way? Maybe pass DistributedTaskContext and
> CacheContainer as separate parameters?
>
>
>> Also, why is a context iterable? Iterates the contents of a CacheContainer? extends generally means that "is something". AFAIK, you'd be able to iterate a Map or Cache, but not a CacheContainer.
>
> True.
>
>> Personally, I think DistributedTask has too many generics (K, V, T, R) and it's hard to read. IMO, only T and R should only exist. I would also try to stick to Callable conventions that takes a V.
>>
>> I don't like to see things like this, reminds me of EJB 2.1 where you were forced to implement a method to simply get hold of a ctx. There're much nicer ways to do things like this, if completely necessary (see EJB3) :
>
> You mean injection? There is a proposal 2 that essentially does this.
>
>>        @Override
>>        public void mapped(DistributedTaskContext<String, String>  ctx) {
>>           this.ctx = ctx;
>>        }
>>
>> Looking at the example provided, it seems to me that all DistributedTaskContext is used for is to navigate the Cache contents from a user defined callable, in which case I would limit its scope.
>
> What do you mean - "limit its scope"?
>
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [infinispan-dev] Distributed execution framework - API proposal(s)

Vladimir Blagojevic
On 11-01-03 4:30 PM, Eduardo Martins wrote:

> Why not?
>
> public interface DistributedTaskContext {
>
>     <K, V>  Cache<K, V>  getCache(String cacheName);
>
>     Address getExecutionNode();
>
> }
>
> -- Eduardo
> ..............................................
> http://emmartins.blogspot.com
> http://redhat.com/solutions/telco
>
Yes, that seems to be a good approach. Galder already suggested this
instead of extending CacheContainer. I already corrected the original
proposal.

Cheers,
Vladimir
_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [infinispan-dev] Distributed execution framework - API proposal(s)

Manik Surtani
In reply to this post by Vladimir Blagojevic
Hmm - I'm looking at your prop-1 branch, somewhat confusing.  Is this the correct/up-to-date branch?




On 3 Jan 2011, at 11:37, Vladimir Blagojevic wrote:

> On 11-01-03 6:16 AM, Galder Zamarreño wrote:
>> Maybe I'm reading this wrong but are you saying that multiple caches cause problem with mapping of task units to nodes in cluster?
>>
>> Or are you just doing it not to clutter the API?
>
> Clutter of API. If you did not like K,V,T,R imagine dealing with
> multiple cache confusion! It would be horrible.
>
>> I think DistributedTaskContext extending CacheContainer is rather confusing, particularly when DistributedTaskContext has K,V parameters that generally are associated with Cache rather than CacheContainer.
>
> Yes, true but DistributedTaskContext is primarily geared towards one
> cache while providing opportunity to read data from other caches as
> well. Hence K,V for the primary cache. Any suggestions how to deal with
> this in a more elegant way? Maybe pass DistributedTaskContext and
> CacheContainer as separate parameters?
>
>
>> Also, why is a context iterable? Iterates the contents of a CacheContainer? extends generally means that "is something". AFAIK, you'd be able to iterate a Map or Cache, but not a CacheContainer.
>
> True.
>
>> Personally, I think DistributedTask has too many generics (K, V, T, R) and it's hard to read. IMO, only T and R should only exist. I would also try to stick to Callable conventions that takes a V.
>>
>> I don't like to see things like this, reminds me of EJB 2.1 where you were forced to implement a method to simply get hold of a ctx. There're much nicer ways to do things like this, if completely necessary (see EJB3) :
>
> You mean injection? There is a proposal 2 that essentially does this.
>
>>       @Override
>>       public void mapped(DistributedTaskContext<String, String>  ctx) {
>>          this.ctx = ctx;
>>       }
>>
>> Looking at the example provided, it seems to me that all DistributedTaskContext is used for is to navigate the Cache contents from a user defined callable, in which case I would limit its scope.
>
> What do you mean - "limit its scope"?
>
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev

--
Manik Surtani
[hidden email]
twitter.com/maniksurtani

Lead, Infinispan
http://www.infinispan.org




_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [infinispan-dev] Distributed execution framework - API proposal(s)

Manik Surtani
In reply to this post by Vladimir Blagojevic
Also, I think we need to be clear about these 2 (map and reduce) functions.  Map doesn't mean "pick node to run task on" in map/reduce speak.  Map means select /transform data for inclusion into a result set.  Perhaps it also makes sense to use smaller/simpler interfaces.  I know this breaks away from the F/J API, but I'm beginning to wonder if there is a proper alignment of purpose here in the first place - going back on my original plans here.  How's this for an alternate API:

Mapper<K, V, T> {
        // just "maps" entries on a remote node.  Map = filter and transform.  Invoked once for each entry on a remote cache.
        // null responses are ignored/filtered out.
        T map(K, V);
}

Reducer<T, R> {
        // incrementally reduces a transformed entry.  Called once for each T produced by the mapper.
        // previously reduced value passed in each time.
        R reduce(T, R);
}

Collator<R> {
        // Adds reduced results from remote nodes.  Called once for each R returned by a RemoteReducer.
        add(Address origin, R remote);

        // collates all results added so far.
        R collate();
}


And the API could do something like

MapReduceContext c = new MapReduceContext(cache);

// 1) distributes 'mapper' cluster wide.  Calls mapper.map() for each K/V pair.  Stores result T for each invocation if T != null.
// 2) For each T, reducer.reduce() is called.  Each time, the previous value of R is passed back in to reduce().
// 3) Final value of R is sent back as a RPC result.  For each result, address and R is passed collator.add()
// 4) Once all remote RPCs have responded, collator.collate() is called, pass result back to caller.
R r = c.invoke(mapper, reducer, collator);

Variants may include:

Filtering nodes:
// restricts the set of nodes where RPCs are sent, based on the subset of the cluster that contain one or more of K.
// question: does this mean only K/V pairs that are in K... are passed in to the mapper?
R r = c.invoke(mapper, reducer, collator, K...);

Using futures:
NotifyingFuture<R> f = c.invokeFuture(mapper, reducer, collator)

Example:  implementing a word count. but only for keys that start with "text" :

Mapper<String, String, Integer> mapper = new Mapper<String, String, Integer> () {
    Integer map(String k, String v) {
                return k.startsWith("text") ? v.length() : null;      
        }
}

Reducer<Integer, Integer> reducer = Reducer<Integer, Integer>() {
        Integer reduce(Integer transformed, Integer prevReduced) {return transformed + prevReduced;}
}

Collator<Integer> collator = Collator<Integer>() {
        int collated = 0;
        void add(Address origin, Integer result) {collated += result;}
        Integer collate() {return collated;}
}


WDYT?  :-)

Cheers
Manik



On 3 Jan 2011, at 11:37, Vladimir Blagojevic wrote:

> On 11-01-03 6:16 AM, Galder Zamarreño wrote:
>> Maybe I'm reading this wrong but are you saying that multiple caches cause problem with mapping of task units to nodes in cluster?
>>
>> Or are you just doing it not to clutter the API?
>
> Clutter of API. If you did not like K,V,T,R imagine dealing with
> multiple cache confusion! It would be horrible.
>
>> I think DistributedTaskContext extending CacheContainer is rather confusing, particularly when DistributedTaskContext has K,V parameters that generally are associated with Cache rather than CacheContainer.
>
> Yes, true but DistributedTaskContext is primarily geared towards one
> cache while providing opportunity to read data from other caches as
> well. Hence K,V for the primary cache. Any suggestions how to deal with
> this in a more elegant way? Maybe pass DistributedTaskContext and
> CacheContainer as separate parameters?
>
>
>> Also, why is a context iterable? Iterates the contents of a CacheContainer? extends generally means that "is something". AFAIK, you'd be able to iterate a Map or Cache, but not a CacheContainer.
>
> True.
>
>> Personally, I think DistributedTask has too many generics (K, V, T, R) and it's hard to read. IMO, only T and R should only exist. I would also try to stick to Callable conventions that takes a V.
>>
>> I don't like to see things like this, reminds me of EJB 2.1 where you were forced to implement a method to simply get hold of a ctx. There're much nicer ways to do things like this, if completely necessary (see EJB3) :
>
> You mean injection? There is a proposal 2 that essentially does this.
>
>>       @Override
>>       public void mapped(DistributedTaskContext<String, String>  ctx) {
>>          this.ctx = ctx;
>>       }
>>
>> Looking at the example provided, it seems to me that all DistributedTaskContext is used for is to navigate the Cache contents from a user defined callable, in which case I would limit its scope.
>
> What do you mean - "limit its scope"?
>
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev

--
Manik Surtani
[hidden email]
twitter.com/maniksurtani

Lead, Infinispan
http://www.infinispan.org




_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [infinispan-dev] Distributed execution framework - API proposal(s)

Sanne Grinovero

As a data-oriented API, why should I deal with details such as Address? Could we avoid that?
Also I didn't see a Collator in other examples; I've never used M/R seriusly so I might have forgotten the more complex examples, but my impression was that problems should be entirely expressed in Mapper and Reducer only.

Sanne

On 4 Jan 2011 18:47, "Manik Surtani" <[hidden email]> wrote:

Also, I think we need to be clear about these 2 (map and reduce) functions.  Map doesn't mean "pick node to run task on" in map/reduce speak.  Map means select /transform data for inclusion into a result set.  Perhaps it also makes sense to use smaller/simpler interfaces.  I know this breaks away from the F/J API, but I'm beginning to wonder if there is a proper alignment of purpose here in the first place - going back on my original plans here.  How's this for an alternate API:

Mapper<K, V, T> {
       // just "maps" entries on a remote node.  Map = filter and transform.  Invoked once for each entry on a remote cache.
       // null responses are ignored/filtered out.
       T map(K, V);
}

Reducer<T, R> {
       // incrementally reduces a transformed entry.  Called once for each T produced by the mapper.
       // previously reduced value passed in each time.
       R reduce(T, R);
}

Collator<R> {
       // Adds reduced results from remote nodes.  Called once for each R returned by a RemoteReducer.
       add(Address origin, R remote);

       // collates all results added so far.
       R collate();
}


And the API could do something like

MapReduceContext c = new MapReduceContext(cache);

// 1) distributes 'mapper' cluster wide.  Calls mapper.map() for each K/V pair.  Stores result T for each invocation if T != null.
// 2) For each T, reducer.reduce() is called.  Each time, the previous value of R is passed back in to reduce().
// 3) Final value of R is sent back as a RPC result.  For each result, address and R is passed collator.add()
// 4) Once all remote RPCs have responded, collator.collate() is called, pass result back to caller.
R r = c.invoke(mapper, reducer, collator);

Variants may include:

Filtering nodes:
// restricts the set of nodes where RPCs are sent, based on the subset of the cluster that contain one or more of K.
// question: does this mean only K/V pairs that are in K... are passed in to the mapper?
R r = c.invoke(mapper, reducer, collator, K...);

Using futures:
NotifyingFuture<R> f = c.invokeFuture(mapper, reducer, collator)

Example:  implementing a word count. but only for keys that start with "text" :

Mapper<String, String, Integer> mapper = new Mapper<String, String, Integer> () {
   Integer map(String k, String v) {
               return k.startsWith("text") ? v.length() : null;
       }
}

Reducer<Integer, Integer> reducer = Reducer<Integer, Integer>() {
       Integer reduce(Integer transformed, Integer prevReduced) {return transformed + prevReduced;}
}

Collator<Integer> collator = Collator<Integer>() {
       int collated = 0;
       void add(Address origin, Integer result) {collated += result;}
       Integer collate() {return collated;}
}


WDYT?  :-)

Cheers
Manik




On 3 Jan 2011, at 11:37, Vladimir Blagojevic wrote:

> On 11-01-03 6:16 AM, Galder Zamarreño wrote:
>> Maybe I'm reading this wrong but are you saying th...

--
Manik Surtani
[hidden email]
twitter.com/maniksurtani

Lead, Infinispan
http://www.infinispan.or...

infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/...


_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [infinispan-dev] Distributed execution framework - API proposal(s)

Manik Surtani-2
The Address is purely additional info if the collator cares to filter out stuff from certain nodes. 

And the whole concept of the collator is just another reduce - couldn't think of a better name. Often it makes sense to reduce once remotely and once again locally. 

Hope this makes sense. :)

Sent from my iPhone

On 4 Jan 2011, at 23:20, Sanne Grinovero <[hidden email]> wrote:

As a data-oriented API, why should I deal with details such as Address? Could we avoid that?
Also I didn't see a Collator in other examples; I've never used M/R seriusly so I might have forgotten the more complex examples, but my impression was that problems should be entirely expressed in Mapper and Reducer only.

Sanne

On 4 Jan 2011 18:47, "Manik Surtani" <[hidden email]> wrote:

Also, I think we need to be clear about these 2 (map and reduce) functions.  Map doesn't mean "pick node to run task on" in map/reduce speak.  Map means select /transform data for inclusion into a result set.  Perhaps it also makes sense to use smaller/simpler interfaces.  I know this breaks away from the F/J API, but I'm beginning to wonder if there is a proper alignment of purpose here in the first place - going back on my original plans here.  How's this for an alternate API:

Mapper<K, V, T> {
       // just "maps" entries on a remote node.  Map = filter and transform.  Invoked once for each entry on a remote cache.
       // null responses are ignored/filtered out.
       T map(K, V);
}

Reducer<T, R> {
       // incrementally reduces a transformed entry.  Called once for each T produced by the mapper.
       // previously reduced value passed in each time.
       R reduce(T, R);
}

Collator<R> {
       // Adds reduced results from remote nodes.  Called once for each R returned by a RemoteReducer.
       add(Address origin, R remote);

       // collates all results added so far.
       R collate();
}


And the API could do something like

MapReduceContext c = new MapReduceContext(cache);

// 1) distributes 'mapper' cluster wide.  Calls mapper.map() for each K/V pair.  Stores result T for each invocation if T != null.
// 2) For each T, reducer.reduce() is called.  Each time, the previous value of R is passed back in to reduce().
// 3) Final value of R is sent back as a RPC result.  For each result, address and R is passed collator.add()
// 4) Once all remote RPCs have responded, collator.collate() is called, pass result back to caller.
R r = c.invoke(mapper, reducer, collator);

Variants may include:

Filtering nodes:
// restricts the set of nodes where RPCs are sent, based on the subset of the cluster that contain one or more of K.
// question: does this mean only K/V pairs that are in K... are passed in to the mapper?
R r = c.invoke(mapper, reducer, collator, K...);

Using futures:
NotifyingFuture<R> f = c.invokeFuture(mapper, reducer, collator)

Example:  implementing a word count. but only for keys that start with "text" :

Mapper<String, String, Integer> mapper = new Mapper<String, String, Integer> () {
   Integer map(String k, String v) {
               return k.startsWith("text") ? v.length() : null;
       }
}

Reducer<Integer, Integer> reducer = Reducer<Integer, Integer>() {
       Integer reduce(Integer transformed, Integer prevReduced) {return transformed + prevReduced;}
}

Collator<Integer> collator = Collator<Integer>() {
       int collated = 0;
       void add(Address origin, Integer result) {collated += result;}
       Integer collate() {return collated;}
}


WDYT?  :-)

Cheers
Manik




On 3 Jan 2011, at 11:37, Vladimir Blagojevic wrote:

> On 11-01-03 6:16 AM, Galder Zamarreño wrote:
>> Maybe I'm reading this wrong but are you saying th...

--
Manik Surtani
[hidden email]
twitter.com/maniksurtani

Lead, Infinispan
http://www.infinispan.or...

infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/...

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [infinispan-dev] Distributed execution framework - API proposal(s)

Vladimir Blagojevic
In reply to this post by Manik Surtani
Manik,

Of course we could go this direction as well. This is very similar to
Hadoop approach and close to original map/reduce paradigm. I
intentionally changed this paradigm into a simpler one because I read a
lot of criticism how it is very hard for "regular" developers to adapt
to original map/reduce paradigm. Hence simpler approach of mapping
runnable to execution nodes and collating results - unfortunately I
named them map and reduce as well.

Anyone else has an opinion while I think about this a bit more?

Regards,
Vladimir




On 11-01-04 3:47 PM, Manik Surtani wrote:

> Also, I think we need to be clear about these 2 (map and reduce) functions.  Map doesn't mean "pick node to run task on" in map/reduce speak.  Map means select /transform data for inclusion into a result set.  Perhaps it also makes sense to use smaller/simpler interfaces.  I know this breaks away from the F/J API, but I'm beginning to wonder if there is a proper alignment of purpose here in the first place - going back on my original plans here.  How's this for an alternate API:
>
> Mapper<K, V, T>  {
> // just "maps" entries on a remote node.  Map = filter and transform.  Invoked once for each entry on a remote cache.
> // null responses are ignored/filtered out.
> T map(K, V);
> }
>
> Reducer<T, R>  {
> // incrementally reduces a transformed entry.  Called once for each T produced by the mapper.
> // previously reduced value passed in each time.
> R reduce(T, R);
> }
>
> Collator<R>  {
> // Adds reduced results from remote nodes.  Called once for each R returned by a RemoteReducer.
> add(Address origin, R remote);
>
> // collates all results added so far.
> R collate();
> }
>
>
> And the API could do something like
>
> MapReduceContext c = new MapReduceContext(cache);
>
> // 1) distributes 'mapper' cluster wide.  Calls mapper.map() for each K/V pair.  Stores result T for each invocation if T != null.
> // 2) For each T, reducer.reduce() is called.  Each time, the previous value of R is passed back in to reduce().
> // 3) Final value of R is sent back as a RPC result.  For each result, address and R is passed collator.add()
> // 4) Once all remote RPCs have responded, collator.collate() is called, pass result back to caller.
> R r = c.invoke(mapper, reducer, collator);
>
> Variants may include:
>
> Filtering nodes:
> // restricts the set of nodes where RPCs are sent, based on the subset of the cluster that contain one or more of K.
> // question: does this mean only K/V pairs that are in K... are passed in to the mapper?
> R r = c.invoke(mapper, reducer, collator, K...);
>
> Using futures:
> NotifyingFuture<R>  f = c.invokeFuture(mapper, reducer, collator)
>
> Example:  implementing a word count. but only for keys that start with "text" :
>
> Mapper<String, String, Integer>  mapper = new Mapper<String, String, Integer>  () {
>      Integer map(String k, String v) {
> return k.startsWith("text") ? v.length() : null;
> }
> }
>
> Reducer<Integer, Integer>  reducer = Reducer<Integer, Integer>() {
> Integer reduce(Integer transformed, Integer prevReduced) {return transformed + prevReduced;}
> }
>
> Collator<Integer>  collator = Collator<Integer>() {
> int collated = 0;
> void add(Address origin, Integer result) {collated += result;}
> Integer collate() {return collated;}
> }
>
>
> WDYT?  :-)
>
> Cheers
> Manik
>
>
>
> On 3 Jan 2011, at 11:37, Vladimir Blagojevic wrote:
>
>> On 11-01-03 6:16 AM, Galder Zamarreño wrote:
>>> Maybe I'm reading this wrong but are you saying that multiple caches cause problem with mapping of task units to nodes in cluster?
>>>
>>> Or are you just doing it not to clutter the API?
>> Clutter of API. If you did not like K,V,T,R imagine dealing with
>> multiple cache confusion! It would be horrible.
>>
>>> I think DistributedTaskContext extending CacheContainer is rather confusing, particularly when DistributedTaskContext has K,V parameters that generally are associated with Cache rather than CacheContainer.
>> Yes, true but DistributedTaskContext is primarily geared towards one
>> cache while providing opportunity to read data from other caches as
>> well. Hence K,V for the primary cache. Any suggestions how to deal with
>> this in a more elegant way? Maybe pass DistributedTaskContext and
>> CacheContainer as separate parameters?
>>
>>
>>> Also, why is a context iterable? Iterates the contents of a CacheContainer? extends generally means that "is something". AFAIK, you'd be able to iterate a Map or Cache, but not a CacheContainer.
>> True.
>>
>>> Personally, I think DistributedTask has too many generics (K, V, T, R) and it's hard to read. IMO, only T and R should only exist. I would also try to stick to Callable conventions that takes a V.
>>>
>>> I don't like to see things like this, reminds me of EJB 2.1 where you were forced to implement a method to simply get hold of a ctx. There're much nicer ways to do things like this, if completely necessary (see EJB3) :
>> You mean injection? There is a proposal 2 that essentially does this.
>>
>>>        @Override
>>>        public void mapped(DistributedTaskContext<String, String>   ctx) {
>>>           this.ctx = ctx;
>>>        }
>>>
>>> Looking at the example provided, it seems to me that all DistributedTaskContext is used for is to navigate the Cache contents from a user defined callable, in which case I would limit its scope.
>> What do you mean - "limit its scope"?
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> [hidden email]
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
> --
> Manik Surtani
> [hidden email]
> twitter.com/maniksurtani
>
> Lead, Infinispan
> http://www.infinispan.org
>
>
>
>
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [infinispan-dev] Distributed execution framework - API proposal(s)

Galder Zamarreño
In reply to this post by Vladimir Blagojevic
Something else to add here. Earlier today I was looking at GridGain and the functional way to define map/reduce functions and seemed quite appealing, particularly cos they had managed to do it for Java as well:
"Native Java & Scala Support" section in http://www.gridgain.com/product.html

Having contracted the Scala virus last year, this way of coding functions looks more appealing (to me) than the way they did in 2.0 which is fairly close to what is currently being proposed.

Maybe something not for the first version, but something definitely worth keeping in mind.

On Dec 27, 2010, at 5:25 PM, Vladimir Blagojevic wrote:

> Hey,
>
> I spent the last week working on concrete API proposals for distributed
> execution framework. I believe that we are close to finalize the
> proposal and your input and feedback is important now! Here are the main
> ideas where I think we made progress since we last talked.
>
>
> Access to multiple caches during task execution
>
> While we have agreed to allow access to multiple caches during task
> execution including this logic into task API complicates it greatly. The
> compromise I found is to focus all API on to a one specific cache but
> allow access to other caches through DistributedTaskContext API. The
> focus on one specific cache and its input keys will allows us to
> properly CH map task units across Infinispan cluster and will cover most
> of the use cases. DistributedTaskContext can also easily be mapped to a
> single cache. See DistributedTask and DistributedTaskContext for more
> details.
>
>
> DistributedTask and DistributedCallable
>
> I found it useful to separate task characteristics in general and actual
> work/computation details. Therefore the main task characteristics are
> specified through DistributedTask API and details of actual task
> computation are specified through DistributedCallable API.
> DistributedTask specifies coarse task details, the failover policy, the
> task splitting policy, cancellation policy and so on while in
> DistributedCallable API implementers focus on actual details of a
> computation/work unit.
>
>
> I have updated the original document [1] to reflect API update. You can
> see the actual proposal in git here [2] and I have also included the
> variation of this approach [3] that separates map and reduce task phases
> with separate interfaces and removes DistributedCallable interaface. I
> have also kept Trustin's ideas in another proposal [4] since I would
> like to include them as well if possible.
>
> Regards,
> Vladimir
>
>
> [1] http://community.jboss.org/wiki/InfinispanDistributedExecutionFramework
> [2] https://github.com/vblagoje/infinispan/tree/t_ispn-39_master_prop1
> [3] https://github.com/vblagoje/infinispan/tree/t_ispn-39_master_prop2
> [4] https://github.com/vblagoje/infinispan/tree/t_ispn-39_master_prop3
>
>
>
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev

--
Galder Zamarreño
Sr. Software Engineer
Infinispan, JBoss Cache


_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [infinispan-dev] Distributed execution framework - API proposal(s)

Galder Zamarreño

On Jan 5, 2011, at 11:30 AM, Galder Zamarreño wrote:

> Something else to add here. Earlier today I was looking at GridGain and the functional way to define map/reduce functions and seemed quite appealing, particularly cos they had managed to do it for Java as well:
> "Native Java & Scala Support" section in http://www.gridgain.com/product.html
>
> Having contracted the Scala virus last year, this way of coding functions looks more appealing (to me) than the way they did in 2.0 which is fairly close to what is currently being proposed.
>
> Maybe something not for the first version, but something definitely worth keeping in mind.

A few years this might have been a niche, but since Scala and Java 7 (later this year), I see this way of coding becoming more and more ubiquitous, so maybe we should do this right from the start? I could certainly give a hand with this part.

>
> On Dec 27, 2010, at 5:25 PM, Vladimir Blagojevic wrote:
>
>> Hey,
>>
>> I spent the last week working on concrete API proposals for distributed
>> execution framework. I believe that we are close to finalize the
>> proposal and your input and feedback is important now! Here are the main
>> ideas where I think we made progress since we last talked.
>>
>>
>> Access to multiple caches during task execution
>>
>> While we have agreed to allow access to multiple caches during task
>> execution including this logic into task API complicates it greatly. The
>> compromise I found is to focus all API on to a one specific cache but
>> allow access to other caches through DistributedTaskContext API. The
>> focus on one specific cache and its input keys will allows us to
>> properly CH map task units across Infinispan cluster and will cover most
>> of the use cases. DistributedTaskContext can also easily be mapped to a
>> single cache. See DistributedTask and DistributedTaskContext for more
>> details.
>>
>>
>> DistributedTask and DistributedCallable
>>
>> I found it useful to separate task characteristics in general and actual
>> work/computation details. Therefore the main task characteristics are
>> specified through DistributedTask API and details of actual task
>> computation are specified through DistributedCallable API.
>> DistributedTask specifies coarse task details, the failover policy, the
>> task splitting policy, cancellation policy and so on while in
>> DistributedCallable API implementers focus on actual details of a
>> computation/work unit.
>>
>>
>> I have updated the original document [1] to reflect API update. You can
>> see the actual proposal in git here [2] and I have also included the
>> variation of this approach [3] that separates map and reduce task phases
>> with separate interfaces and removes DistributedCallable interaface. I
>> have also kept Trustin's ideas in another proposal [4] since I would
>> like to include them as well if possible.
>>
>> Regards,
>> Vladimir
>>
>>
>> [1] http://community.jboss.org/wiki/InfinispanDistributedExecutionFramework
>> [2] https://github.com/vblagoje/infinispan/tree/t_ispn-39_master_prop1
>> [3] https://github.com/vblagoje/infinispan/tree/t_ispn-39_master_prop2
>> [4] https://github.com/vblagoje/infinispan/tree/t_ispn-39_master_prop3
>>
>>
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> [hidden email]
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
> --
> Galder Zamarreño
> Sr. Software Engineer
> Infinispan, JBoss Cache
>
>
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev

--
Galder Zamarreño
Sr. Software Engineer
Infinispan, JBoss Cache


_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [infinispan-dev] Distributed execution framework - API proposal(s)

Sanne Grinovero
While I agree on the importance of a nice API, I'd polish the Java one first.
This article from May 2008 is already showing off a pretty nice example:
http://www.infoq.com/news/2008/03/fork_join

IMHO even the Java-coded version is very nice, and I don't foresee
people writing hundreds of F/J functions, so clarity is far more
important than brevity and syntax coolness.
I'm not against a nice Scala API, always nice to have, but the Java
one should be made understandable first. (And sorry it might be my
lack of field experience, but I didn't understand how I should code a
simple exercise with the current proposals, while the F/J example is
self-speaking coolness).

Sanne

2011/1/5 Galder Zamarreño <[hidden email]>:

>
> On Jan 5, 2011, at 11:30 AM, Galder Zamarreño wrote:
>
>> Something else to add here. Earlier today I was looking at GridGain and the functional way to define map/reduce functions and seemed quite appealing, particularly cos they had managed to do it for Java as well:
>> "Native Java & Scala Support" section in http://www.gridgain.com/product.html
>>
>> Having contracted the Scala virus last year, this way of coding functions looks more appealing (to me) than the way they did in 2.0 which is fairly close to what is currently being proposed.
>>
>> Maybe something not for the first version, but something definitely worth keeping in mind.
>
> A few years this might have been a niche, but since Scala and Java 7 (later this year), I see this way of coding becoming more and more ubiquitous, so maybe we should do this right from the start? I could certainly give a hand with this part.
>
>>
>> On Dec 27, 2010, at 5:25 PM, Vladimir Blagojevic wrote:
>>
>>> Hey,
>>>
>>> I spent the last week working on concrete API proposals for distributed
>>> execution framework. I believe that we are close to finalize the
>>> proposal and your input and feedback is important now! Here are the main
>>> ideas where I think we made progress since we last talked.
>>>
>>>
>>> Access to multiple caches during task execution
>>>
>>> While we have agreed to allow access to multiple caches during task
>>> execution including this logic into task API complicates it greatly. The
>>> compromise I found is to focus all API on to a one specific cache but
>>> allow access to other caches through DistributedTaskContext API. The
>>> focus on one specific cache and its input keys will allows us to
>>> properly CH map task units across Infinispan cluster and will cover most
>>> of the use cases. DistributedTaskContext can also easily be mapped to a
>>> single cache. See DistributedTask and DistributedTaskContext for more
>>> details.
>>>
>>>
>>> DistributedTask and DistributedCallable
>>>
>>> I found it useful to separate task characteristics in general and actual
>>> work/computation details. Therefore the main task characteristics are
>>> specified through DistributedTask API and details of actual task
>>> computation are specified through DistributedCallable API.
>>> DistributedTask specifies coarse task details, the failover policy, the
>>> task splitting policy, cancellation policy and so on while in
>>> DistributedCallable API implementers focus on actual details of a
>>> computation/work unit.
>>>
>>>
>>> I have updated the original document [1] to reflect API update. You can
>>> see the actual proposal in git here [2] and I have also included the
>>> variation of this approach [3] that separates map and reduce task phases
>>> with separate interfaces and removes DistributedCallable interaface. I
>>> have also kept Trustin's ideas in another proposal [4] since I would
>>> like to include them as well if possible.
>>>
>>> Regards,
>>> Vladimir
>>>
>>>
>>> [1] http://community.jboss.org/wiki/InfinispanDistributedExecutionFramework
>>> [2] https://github.com/vblagoje/infinispan/tree/t_ispn-39_master_prop1
>>> [3] https://github.com/vblagoje/infinispan/tree/t_ispn-39_master_prop2
>>> [4] https://github.com/vblagoje/infinispan/tree/t_ispn-39_master_prop3
>>>
>>>
>>>
>>> _______________________________________________
>>> infinispan-dev mailing list
>>> [hidden email]
>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>
>> --
>> Galder Zamarreño
>> Sr. Software Engineer
>> Infinispan, JBoss Cache
>>
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> [hidden email]
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
> --
> Galder Zamarreño
> Sr. Software Engineer
> Infinispan, JBoss Cache
>
>
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [infinispan-dev] Distributed execution framework - API proposal(s)

Vladimir Blagojevic
In reply to this post by Vladimir Blagojevic
These two approaches are not mutually exclusive. Here is what we can do.
We can have a very simple "execute this runnable on remote nodes and
collect results" model as an infrastructure for a higher layer
map/reduce you proposed Manik. That way we cover both ends - a simple
execution model on remote nodes for a given input data set and a more
sophisticated, original map/reduce model built on top of it. Users can
choose what fit their needs best. I can definitely see a need arising
for both of these approaches.

WDYT?

On 11-01-05 12:46 AM, Vladimir Blagojevic wrote:

> Manik,
>
> Of course we could go this direction as well. This is very similar to
> Hadoop approach and close to original map/reduce paradigm. I
> intentionally changed this paradigm into a simpler one because I read a
> lot of criticism how it is very hard for "regular" developers to adapt
> to original map/reduce paradigm. Hence simpler approach of mapping
> runnable to execution nodes and collating results - unfortunately I
> named them map and reduce as well.
>
> Anyone else has an opinion while I think about this a bit more?
>
> Regards,
> Vladimir
>
>
>
>
> On 11-01-04 3:47 PM, Manik Surtani wrote:
>> Also, I think we need to be clear about these 2 (map and reduce) functions.  Map doesn't mean "pick node to run task on" in map/reduce speak.  Map means select /transform data for inclusion into a result set.  Perhaps it also makes sense to use smaller/simpler interfaces.  I know this breaks away from the F/J API, but I'm beginning to wonder if there is a proper alignment of purpose here in the first place - going back on my original plans here.  How's this for an alternate API:
>>
>> Mapper<K, V, T>   {
>> // just "maps" entries on a remote node.  Map = filter and transform.  Invoked once for each entry on a remote cache.
>> // null responses are ignored/filtered out.
>> T map(K, V);
>> }
>>
>> Reducer<T, R>   {
>> // incrementally reduces a transformed entry.  Called once for each T produced by the mapper.
>> // previously reduced value passed in each time.
>> R reduce(T, R);
>> }
>>
>> Collator<R>   {
>> // Adds reduced results from remote nodes.  Called once for each R returned by a RemoteReducer.
>> add(Address origin, R remote);
>>
>> // collates all results added so far.
>> R collate();
>> }
>>
>>
>> And the API could do something like
>>
>> MapReduceContext c = new MapReduceContext(cache);
>>
>> // 1) distributes 'mapper' cluster wide.  Calls mapper.map() for each K/V pair.  Stores result T for each invocation if T != null.
>> // 2) For each T, reducer.reduce() is called.  Each time, the previous value of R is passed back in to reduce().
>> // 3) Final value of R is sent back as a RPC result.  For each result, address and R is passed collator.add()
>> // 4) Once all remote RPCs have responded, collator.collate() is called, pass result back to caller.
>> R r = c.invoke(mapper, reducer, collator);
>>
>> Variants may include:
>>
>> Filtering nodes:
>> // restricts the set of nodes where RPCs are sent, based on the subset of the cluster that contain one or more of K.
>> // question: does this mean only K/V pairs that are in K... are passed in to the mapper?
>> R r = c.invoke(mapper, reducer, collator, K...);
>>
>> Using futures:
>> NotifyingFuture<R>   f = c.invokeFuture(mapper, reducer, collator)
>>
>> Example:  implementing a word count. but only for keys that start with "text" :
>>
>> Mapper<String, String, Integer>   mapper = new Mapper<String, String, Integer>   () {
>>       Integer map(String k, String v) {
>> return k.startsWith("text") ? v.length() : null;
>> }
>> }
>>
>> Reducer<Integer, Integer>   reducer = Reducer<Integer, Integer>() {
>> Integer reduce(Integer transformed, Integer prevReduced) {return transformed + prevReduced;}
>> }
>>
>> Collator<Integer>   collator = Collator<Integer>() {
>> int collated = 0;
>> void add(Address origin, Integer result) {collated += result;}
>> Integer collate() {return collated;}
>> }
>>
>>
>> WDYT?  :-)
>>
>> Cheers
>> Manik
>>
>>
>>
>> On 3 Jan 2011, at 11:37, Vladimir Blagojevic wrote:
>>
>>> On 11-01-03 6:16 AM, Galder Zamarreño wrote:
>>>> Maybe I'm reading this wrong but are you saying that multiple caches cause problem with mapping of task units to nodes in cluster?
>>>>
>>>> Or are you just doing it not to clutter the API?
>>> Clutter of API. If you did not like K,V,T,R imagine dealing with
>>> multiple cache confusion! It would be horrible.
>>>
>>>> I think DistributedTaskContext extending CacheContainer is rather confusing, particularly when DistributedTaskContext has K,V parameters that generally are associated with Cache rather than CacheContainer.
>>> Yes, true but DistributedTaskContext is primarily geared towards one
>>> cache while providing opportunity to read data from other caches as
>>> well. Hence K,V for the primary cache. Any suggestions how to deal with
>>> this in a more elegant way? Maybe pass DistributedTaskContext and
>>> CacheContainer as separate parameters?
>>>
>>>
>>>> Also, why is a context iterable? Iterates the contents of a CacheContainer? extends generally means that "is something". AFAIK, you'd be able to iterate a Map or Cache, but not a CacheContainer.
>>> True.
>>>
>>>> Personally, I think DistributedTask has too many generics (K, V, T, R) and it's hard to read. IMO, only T and R should only exist. I would also try to stick to Callable conventions that takes a V.
>>>>
>>>> I don't like to see things like this, reminds me of EJB 2.1 where you were forced to implement a method to simply get hold of a ctx. There're much nicer ways to do things like this, if completely necessary (see EJB3) :
>>> You mean injection? There is a proposal 2 that essentially does this.
>>>
>>>>         @Override
>>>>         public void mapped(DistributedTaskContext<String, String>    ctx) {
>>>>            this.ctx = ctx;
>>>>         }
>>>>
>>>> Looking at the example provided, it seems to me that all DistributedTaskContext is used for is to navigate the Cache contents from a user defined callable, in which case I would limit its scope.
>>> What do you mean - "limit its scope"?
>>>
>>> _______________________________________________
>>> infinispan-dev mailing list
>>> [hidden email]
>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> --
>> Manik Surtani
>> [hidden email]
>> twitter.com/maniksurtani
>>
>> Lead, Infinispan
>> http://www.infinispan.org
>>
>>
>>
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> [hidden email]
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [infinispan-dev] Distributed execution framework - API proposal(s)

Manik Surtani
In reply to this post by Sanne Grinovero
I agree with Sanne.  A top-level Scala API may be nice, but definitely lower in prio when compared to a more traditional Java-centric API using Java paradigms.  The latter is still what most folks will use for some time to come.


On 5 Jan 2011, at 13:11, Sanne Grinovero wrote:

> While I agree on the importance of a nice API, I'd polish the Java one first.
> This article from May 2008 is already showing off a pretty nice example:
> http://www.infoq.com/news/2008/03/fork_join
>
> IMHO even the Java-coded version is very nice, and I don't foresee
> people writing hundreds of F/J functions, so clarity is far more
> important than brevity and syntax coolness.
> I'm not against a nice Scala API, always nice to have, but the Java
> one should be made understandable first. (And sorry it might be my
> lack of field experience, but I didn't understand how I should code a
> simple exercise with the current proposals, while the F/J example is
> self-speaking coolness).
>
> Sanne
>
> 2011/1/5 Galder Zamarreño <[hidden email]>:
>>
>> On Jan 5, 2011, at 11:30 AM, Galder Zamarreño wrote:
>>
>>> Something else to add here. Earlier today I was looking at GridGain and the functional way to define map/reduce functions and seemed quite appealing, particularly cos they had managed to do it for Java as well:
>>> "Native Java & Scala Support" section in http://www.gridgain.com/product.html
>>>
>>> Having contracted the Scala virus last year, this way of coding functions looks more appealing (to me) than the way they did in 2.0 which is fairly close to what is currently being proposed.
>>>
>>> Maybe something not for the first version, but something definitely worth keeping in mind.
>>
>> A few years this might have been a niche, but since Scala and Java 7 (later this year), I see this way of coding becoming more and more ubiquitous, so maybe we should do this right from the start? I could certainly give a hand with this part.
>>
>>>
>>> On Dec 27, 2010, at 5:25 PM, Vladimir Blagojevic wrote:
>>>
>>>> Hey,
>>>>
>>>> I spent the last week working on concrete API proposals for distributed
>>>> execution framework. I believe that we are close to finalize the
>>>> proposal and your input and feedback is important now! Here are the main
>>>> ideas where I think we made progress since we last talked.
>>>>
>>>>
>>>> Access to multiple caches during task execution
>>>>
>>>> While we have agreed to allow access to multiple caches during task
>>>> execution including this logic into task API complicates it greatly. The
>>>> compromise I found is to focus all API on to a one specific cache but
>>>> allow access to other caches through DistributedTaskContext API. The
>>>> focus on one specific cache and its input keys will allows us to
>>>> properly CH map task units across Infinispan cluster and will cover most
>>>> of the use cases. DistributedTaskContext can also easily be mapped to a
>>>> single cache. See DistributedTask and DistributedTaskContext for more
>>>> details.
>>>>
>>>>
>>>> DistributedTask and DistributedCallable
>>>>
>>>> I found it useful to separate task characteristics in general and actual
>>>> work/computation details. Therefore the main task characteristics are
>>>> specified through DistributedTask API and details of actual task
>>>> computation are specified through DistributedCallable API.
>>>> DistributedTask specifies coarse task details, the failover policy, the
>>>> task splitting policy, cancellation policy and so on while in
>>>> DistributedCallable API implementers focus on actual details of a
>>>> computation/work unit.
>>>>
>>>>
>>>> I have updated the original document [1] to reflect API update. You can
>>>> see the actual proposal in git here [2] and I have also included the
>>>> variation of this approach [3] that separates map and reduce task phases
>>>> with separate interfaces and removes DistributedCallable interaface. I
>>>> have also kept Trustin's ideas in another proposal [4] since I would
>>>> like to include them as well if possible.
>>>>
>>>> Regards,
>>>> Vladimir
>>>>
>>>>
>>>> [1] http://community.jboss.org/wiki/InfinispanDistributedExecutionFramework
>>>> [2] https://github.com/vblagoje/infinispan/tree/t_ispn-39_master_prop1
>>>> [3] https://github.com/vblagoje/infinispan/tree/t_ispn-39_master_prop2
>>>> [4] https://github.com/vblagoje/infinispan/tree/t_ispn-39_master_prop3
>>>>
>>>>
>>>>
>>>> _______________________________________________
>>>> infinispan-dev mailing list
>>>> [hidden email]
>>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>>
>>> --
>>> Galder Zamarreño
>>> Sr. Software Engineer
>>> Infinispan, JBoss Cache
>>>
>>>
>>> _______________________________________________
>>> infinispan-dev mailing list
>>> [hidden email]
>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>
>> --
>> Galder Zamarreño
>> Sr. Software Engineer
>> Infinispan, JBoss Cache
>>
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> [hidden email]
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>
>
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev

--
Manik Surtani
[hidden email]
twitter.com/maniksurtani

Lead, Infinispan
http://www.infinispan.org




_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [infinispan-dev] Distributed execution framework - API proposal(s)

Manik Surtani
In reply to this post by Vladimir Blagojevic

On 5 Jan 2011, at 15:21, Vladimir Blagojevic wrote:

> These two approaches are not mutually exclusive. Here is what we can do.
> We can have a very simple "execute this runnable on remote nodes and
> collect results" model as an infrastructure for a higher layer
> map/reduce you proposed Manik. That way we cover both ends - a simple
> execution model on remote nodes for a given input data set and a more
> sophisticated, original map/reduce model built on top of it. Users can
> choose what fit their needs best. I can definitely see a need arising
> for both of these approaches.

Ok, so something like what I proposed, plus:

DistributedCallable<T> extends Callable<T> () {
        setEmbeddedCacheManager(ecm);
        setCache(c);
}

DistributedRemoteTask dt = new DistributedRemoteTask(cache, new DistributedCallable<T>() {... });
List<Future<T>> res = dt.invoke(); // executes on all nodes
List<Future<T>> res = dt.invoke(K...); // executes on all nodes containing some or more of K

and expect the user to iterate over whatever is needed, performing any mapping, transforming, reducing, etc in DistributedCallable.call()?

If so, then we need to define the following APIs (along with an example of using each, so we are clear on how each API works).

M/R API:
        Mapper<K, V, T>
        Reducer<T, R>
        Collator<R>
        MapReduceTask<T, R>

DistributedCallable API:
        DistributedCallable<K, V, R>
        DistributedRemoteTask<R>

right?

MapReduceTask could extend DistributedRemoteTask... (maybe) ... and would need the following overloaded invocation methods, perhaps:

R invoke(); //invoke on all nodes
R invoke(K...); // invoke on a subset of nodes
R invoke(Address...); // invoke on a subset of nodes
Future<R> invokeAsync(); //invoke on all nodes
Future<R> invokeAsync(K...); // invoke on a subset of nodes
Future<R> invokeAsync(Address...); // invoke on a subset of nodes

Or, perhaps, making use of a more fluent API:

R result = new MapReduceTask(cache).on(K...).mappedWith(mapper).reducedWith(reducer).collate(collator);

Future<R> result = new MapReduceTask(cache).on(K...).mappedWith(mapper).reducedWith(reducer).collateAsynchronously(collator);

List<R> result = new DistributedRemoteTask(cache).on(K...).execute(distributedCallable);

List<Future<R>> result = new DistributedRemoteTask(cache).on(K...).executeAsync(distributedCallable);


WDYT?  I think the fluent API makes a lot of sense, and it is very clear/user friendly.

Cheers
Manik


>
> WDYT?
>
> On 11-01-05 12:46 AM, Vladimir Blagojevic wrote:
>> Manik,
>>
>> Of course we could go this direction as well. This is very similar to
>> Hadoop approach and close to original map/reduce paradigm. I
>> intentionally changed this paradigm into a simpler one because I read a
>> lot of criticism how it is very hard for "regular" developers to adapt
>> to original map/reduce paradigm. Hence simpler approach of mapping
>> runnable to execution nodes and collating results - unfortunately I
>> named them map and reduce as well.
>>
>> Anyone else has an opinion while I think about this a bit more?
>>
>> Regards,
>> Vladimir
>>
>>
>>
>>
>> On 11-01-04 3:47 PM, Manik Surtani wrote:
>>> Also, I think we need to be clear about these 2 (map and reduce) functions.  Map doesn't mean "pick node to run task on" in map/reduce speak.  Map means select /transform data for inclusion into a result set.  Perhaps it also makes sense to use smaller/simpler interfaces.  I know this breaks away from the F/J API, but I'm beginning to wonder if there is a proper alignment of purpose here in the first place - going back on my original plans here.  How's this for an alternate API:
>>>
>>> Mapper<K, V, T>   {
>>> // just "maps" entries on a remote node.  Map = filter and transform.  Invoked once for each entry on a remote cache.
>>> // null responses are ignored/filtered out.
>>> T map(K, V);
>>> }
>>>
>>> Reducer<T, R>   {
>>> // incrementally reduces a transformed entry.  Called once for each T produced by the mapper.
>>> // previously reduced value passed in each time.
>>> R reduce(T, R);
>>> }
>>>
>>> Collator<R>   {
>>> // Adds reduced results from remote nodes.  Called once for each R returned by a RemoteReducer.
>>> add(Address origin, R remote);
>>>
>>> // collates all results added so far.
>>> R collate();
>>> }
>>>
>>>
>>> And the API could do something like
>>>
>>> MapReduceContext c = new MapReduceContext(cache);
>>>
>>> // 1) distributes 'mapper' cluster wide.  Calls mapper.map() for each K/V pair.  Stores result T for each invocation if T != null.
>>> // 2) For each T, reducer.reduce() is called.  Each time, the previous value of R is passed back in to reduce().
>>> // 3) Final value of R is sent back as a RPC result.  For each result, address and R is passed collator.add()
>>> // 4) Once all remote RPCs have responded, collator.collate() is called, pass result back to caller.
>>> R r = c.invoke(mapper, reducer, collator);
>>>
>>> Variants may include:
>>>
>>> Filtering nodes:
>>> // restricts the set of nodes where RPCs are sent, based on the subset of the cluster that contain one or more of K.
>>> // question: does this mean only K/V pairs that are in K... are passed in to the mapper?
>>> R r = c.invoke(mapper, reducer, collator, K...);
>>>
>>> Using futures:
>>> NotifyingFuture<R>   f = c.invokeFuture(mapper, reducer, collator)
>>>
>>> Example:  implementing a word count. but only for keys that start with "text" :
>>>
>>> Mapper<String, String, Integer>   mapper = new Mapper<String, String, Integer>   () {
>>>      Integer map(String k, String v) {
>>> return k.startsWith("text") ? v.length() : null;
>>> }
>>> }
>>>
>>> Reducer<Integer, Integer>   reducer = Reducer<Integer, Integer>() {
>>> Integer reduce(Integer transformed, Integer prevReduced) {return transformed + prevReduced;}
>>> }
>>>
>>> Collator<Integer>   collator = Collator<Integer>() {
>>> int collated = 0;
>>> void add(Address origin, Integer result) {collated += result;}
>>> Integer collate() {return collated;}
>>> }
>>>
>>>
>>> WDYT?  :-)
>>>
>>> Cheers
>>> Manik
>>>
>>>
>>>
>>> On 3 Jan 2011, at 11:37, Vladimir Blagojevic wrote:
>>>
>>>> On 11-01-03 6:16 AM, Galder Zamarreño wrote:
>>>>> Maybe I'm reading this wrong but are you saying that multiple caches cause problem with mapping of task units to nodes in cluster?
>>>>>
>>>>> Or are you just doing it not to clutter the API?
>>>> Clutter of API. If you did not like K,V,T,R imagine dealing with
>>>> multiple cache confusion! It would be horrible.
>>>>
>>>>> I think DistributedTaskContext extending CacheContainer is rather confusing, particularly when DistributedTaskContext has K,V parameters that generally are associated with Cache rather than CacheContainer.
>>>> Yes, true but DistributedTaskContext is primarily geared towards one
>>>> cache while providing opportunity to read data from other caches as
>>>> well. Hence K,V for the primary cache. Any suggestions how to deal with
>>>> this in a more elegant way? Maybe pass DistributedTaskContext and
>>>> CacheContainer as separate parameters?
>>>>
>>>>
>>>>> Also, why is a context iterable? Iterates the contents of a CacheContainer? extends generally means that "is something". AFAIK, you'd be able to iterate a Map or Cache, but not a CacheContainer.
>>>> True.
>>>>
>>>>> Personally, I think DistributedTask has too many generics (K, V, T, R) and it's hard to read. IMO, only T and R should only exist. I would also try to stick to Callable conventions that takes a V.
>>>>>
>>>>> I don't like to see things like this, reminds me of EJB 2.1 where you were forced to implement a method to simply get hold of a ctx. There're much nicer ways to do things like this, if completely necessary (see EJB3) :
>>>> You mean injection? There is a proposal 2 that essentially does this.
>>>>
>>>>>        @Override
>>>>>        public void mapped(DistributedTaskContext<String, String>    ctx) {
>>>>>           this.ctx = ctx;
>>>>>        }
>>>>>
>>>>> Looking at the example provided, it seems to me that all DistributedTaskContext is used for is to navigate the Cache contents from a user defined callable, in which case I would limit its scope.
>>>> What do you mean - "limit its scope"?
>>>>
>>>> _______________________________________________
>>>> infinispan-dev mailing list
>>>> [hidden email]
>>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>> --
>>> Manik Surtani
>>> [hidden email]
>>> twitter.com/maniksurtani
>>>
>>> Lead, Infinispan
>>> http://www.infinispan.org
>>>
>>>
>>>
>>>
>>> _______________________________________________
>>> infinispan-dev mailing list
>>> [hidden email]
>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> _______________________________________________
>> infinispan-dev mailing list
>> [hidden email]
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev

--
Manik Surtani
[hidden email]
twitter.com/maniksurtani

Lead, Infinispan
http://www.infinispan.org




_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [infinispan-dev] Distributed execution framework - API proposal(s)

Vladimir Blagojevic
On 11-01-05 1:19 PM, Manik Surtani wrote:

> Ok, so something like what I proposed, plus:
>
> DistributedCallable<T>  extends Callable<T>  () {
> setEmbeddedCacheManager(ecm);
> setCache(c);
> }
>
> DistributedRemoteTask dt = new DistributedRemoteTask(cache, new DistributedCallable<T>() {... });
> List<Future<T>>  res = dt.invoke(); // executes on all nodes
> List<Future<T>>  res = dt.invoke(K...); // executes on all nodes containing some or more of K
>
> and expect the user to iterate over whatever is needed, performing any mapping, transforming, reducing, etc in DistributedCallable.call()?

Essentially yes. Lets merge it with
https://github.com/vblagoje/infinispan/commit/b6cd39fe56e1cced96926ed09e69f9d6a823d64e

Forget for a moment about misnamed map/reduce. We give access to users
to choose execution nodes they want or let CH handle it for them by
default. We could do invoke on task that returns futures as you
suggested and/or keep reduce and have invoke return R. Those are just
detail variations of the same concept. Furthermore, all distribution and
execution related policies would be set on DistributedTask level. There
would be one DistributedCallable per Infinispan node and that
DistributedCallable would be prepared for execution with
DistributedTaskContext given to that Callable would be setup to reflect
all K,V pairs residing on that node. Also users have access to other
caches if needed. Finally, see WordCount example to get a feel.



> If so, then we need to define the following APIs (along with an example of using each, so we are clear on how each API works).
>
> M/R API:
> Mapper<K, V, T>
> Reducer<T, R>
> Collator<R>
> MapReduceTask<T, R>
>
> DistributedCallable API:
> DistributedCallable<K, V, R>
> DistributedRemoteTask<R>
>
> right?
>
> MapReduceTask could extend DistributedRemoteTask... (maybe) ... and would need the following overloaded invocation methods, perhaps:
>
> R invoke(); //invoke on all nodes
> R invoke(K...); // invoke on a subset of nodes
> R invoke(Address...); // invoke on a subset of nodes
> Future<R>  invokeAsync(); //invoke on all nodes
> Future<R>  invokeAsync(K...); // invoke on a subset of nodes
> Future<R>  invokeAsync(Address...); // invoke on a subset of nodes
>
> Or, perhaps, making use of a more fluent API:
>
> R result = new MapReduceTask(cache).on(K...).mappedWith(mapper).reducedWith(reducer).collate(collator);
>
> Future<R>  result = new MapReduceTask(cache).on(K...).mappedWith(mapper).reducedWith(reducer).collateAsynchronously(collator);
>
> List<R>  result = new DistributedRemoteTask(cache).on(K...).execute(distributedCallable);
>
> List<Future<R>>  result = new DistributedRemoteTask(cache).on(K...).executeAsync(distributedCallable);
>



Awesome! I love this fluent API proposal!

Excellent! Anyone else?



> WDYT?  I think the fluent API makes a lot of sense, and it is very clear/user friendly.
>
> Cheers
> Manik
>

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [infinispan-dev] Distributed execution framework - API proposal(s)

Manik Surtani

On 5 Jan 2011, at 17:05, Vladimir Blagojevic wrote:

Or, perhaps, making use of a more fluent API:

R result = new MapReduceTask(cache).on(K...).mappedWith(mapper).reducedWith(reducer).collate(collator);

Future<R>  result = new MapReduceTask(cache).on(K...).mappedWith(mapper).reducedWith(reducer).collateAsynchronously(collator);

List<R>  result = new DistributedRemoteTask(cache).on(K...).execute(distributedCallable);

List<Future<R>>  result = new DistributedRemoteTask(cache).on(K...).executeAsync(distributedCallable);




Awesome! I love this fluent API proposal!


Cool.  Then if this is the case lets stick with the fluent API - I think it would make life much easier for developers.



_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [infinispan-dev] Distributed execution framework - API proposal(s)

Eduardo Martins
You probably already discussed this, sorry if that's the case, but for
someone new (but interested) at the discussion, why not start with an
ExecutiveService impl that executes in the Infinispan cloud?

-- Eduardo
..............................................
http://emmartins.blogspot.com
http://redhat.com/solutions/telco



On Wed, Jan 5, 2011 at 6:39 PM, Manik Surtani <[hidden email]> wrote:

>
> On 5 Jan 2011, at 17:05, Vladimir Blagojevic wrote:
>
> Or, perhaps, making use of a more fluent API:
>
> R result = new
> MapReduceTask(cache).on(K...).mappedWith(mapper).reducedWith(reducer).collate(collator);
>
> Future<R>  result = new
> MapReduceTask(cache).on(K...).mappedWith(mapper).reducedWith(reducer).collateAsynchronously(collator);
>
> List<R>  result = new
> DistributedRemoteTask(cache).on(K...).execute(distributedCallable);
>
> List<Future<R>>  result = new
> DistributedRemoteTask(cache).on(K...).executeAsync(distributedCallable);
>
>
>
>
> Awesome! I love this fluent API proposal!
>
>
> Cool.  Then if this is the case lets stick with the fluent API - I think it
> would make life much easier for developers.
> --
> Manik Surtani
> [hidden email]
> twitter.com/maniksurtani
> Lead, Infinispan
> http://www.infinispan.org
>
>
>
> _______________________________________________
> infinispan-dev mailing list
> [hidden email]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [infinispan-dev] Distributed execution framework - API proposal(s)

Vladimir Blagojevic
In reply to this post by Manik Surtani
On 11-01-05 3:39 PM, Manik Surtani wrote:



Awesome! I love this fluent API proposal!


Cool.  Then if this is the case lets stick with the fluent API - I think it would make life much easier for developers.

--
Manik Surtani


Proposal Manik and I talked about is here https://github.com/vblagoje/infinispan/commit/59b0c3cb78bce70fdcd49d2d36f68de110944fc6

I have slightly changed it to better match map/reduce paradigm described here http://labs.google.com/papers/mapreduce.html
Therefore, we now have both map/reduce and a more simple "execute this task unit on remote nodes and return results" proposal.
Users might not like number of type parameters in MapReduceTask. I wanted to be as precise as possible to begin with and we can discuss how and if to reduce them.
Example for map/reduce is WordCountExample.java. Have a look.

I put a proposal in distexec package. I wanted to have parent package to branch from. We need to accommodate simple execution model, map/reduce, various execution policies and so on. They could not fit all in mapreduce package and its children. Hence distexec package. If you think you have better name - speak up.


Regards,
Vladimir

_______________________________________________
infinispan-dev mailing list
[hidden email]
https://lists.jboss.org/mailman/listinfo/infinispan-dev
12
Loading...