Class AggregateAPI

java.lang.Object
io.avery.vinyl.AggregateAPI

public class AggregateAPI extends Object
A configurator used to define an aggregate operation on a record-stream.

An aggregate operation may define zero or more keys, that together group the input stream into non-overlapping partitions. The keys may be retained as output fields, or discarded. Each partition will capture one or more input records, and produce exactly one output record. In the special case that no keys are used and the stream is empty, the default partition will capture no input records, but will still produce exactly one output record.

An aggregate operation may define fields as aggregate functions. An aggregate function is applied per-partition, and returns one value for the partition, which becomes the value of the field on the output record for that partition.

Fields defined on the configurator (or any sub-configurators) become fields on a resultant header. The header fields are in order of definition on the configurator(s). A field may be redefined on the configurator(s), in which case its definition is replaced, but its original position in the header is retained.

See Also:
  • Nested Class Summary

    Nested Classes
    Modifier and Type
    Class
    Description
    class 
    A sub-configurator used to define fields of an aggregate operation that depend on a common intermediate aggregation result.
    class 
    A sub-configurator used to define keys of an aggregate operation that depend on a common intermediate result.
    class 
    A sub-configurator used to define fields of an aggregate operation that aggregate over values emitted to them by a shared routing function.
  • Method Summary

    Modifier and Type
    Method
    Description
    aggField(Field<T> field, Collector<? super Record,?,? extends T> collector)
    Defines (or redefines) the given field as an aggregate over input records, using the given collector.
    aggs(Collector<? super Record,?,? extends T> collector, Consumer<AggregateAPI.Aggs<T>> config)
    Configures a sub-configurator, that may define (or redefine) fields in terms of the result of aggregating the input records using the given collector.
    key(Field<?> field)
    Defines a key as the lookup of the given field on each input record.
    key(Function<? super Record,?> mapper)
    Defines a key as the application of the given function to each input record.
    keyField(Field<?> field)
    Defines (or redefines) the given field as a key that looks up the same field on each input record.
    keyField(Field<T> field, Function<? super Record,? extends T> mapper)
    Defines (or redefines) the given field as a key that applies the given function to each input record.
    keyFields(Field<?>... fields)
    Defines (or redefines) each of the given fields as keys that look up the same field on each input record.
    keys(Field<?>... fields)
    Defines keys from each of the given fields, as the lookup of the same field on each input record.
    keys(Function<? super Record,? extends T> mapper, Consumer<AggregateAPI.Keys<T>> config)
    Configures a sub-configurator, that may define (or redefine) keys in terms of the result of applying the given function to each input record.
    postField(Field<T> field, Function<? super Record,? extends T> mapper)
    Defines (or redefines) the given field as the application of the given function to each output record.
    route(BiConsumer<? super Record,? super BiConsumer<Field<?>,T>> router, Consumer<AggregateAPI.Route<T>> config)
    Configures a sub-configurator, that may define (or redefine) fields as aggregates over values emitted to those fields by the given routing function.

    Methods inherited from class java.lang.Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
  • Method Details

    • key

      public AggregateAPI key(Field<?> field)
      Defines a key as the lookup of the given field on each input record.
      Parameters:
      field - the field
      Returns:
      this configurator
      Throws:
      NoSuchElementException - if the stream header does not contain the given field
    • key

      public AggregateAPI key(Function<? super Record,?> mapper)
      Defines a key as the application of the given function to each input record.
      Parameters:
      mapper - a function to be applied to each input record
      Returns:
      this configurator
    • keyField

      public AggregateAPI keyField(Field<?> field)
      Defines (or redefines) the given field as a key that looks up the same field on each input record.
      Parameters:
      field - the field
      Returns:
      this configurator
      Throws:
      NoSuchElementException - if the stream header does not contain the given field
    • keyField

      public <T> AggregateAPI keyField(Field<T> field, Function<? super Record,? extends T> mapper)
      Defines (or redefines) the given field as a key that applies the given function to each input record.
      Type Parameters:
      T - the value type of the field
      Parameters:
      field - the field
      mapper - a function to be applied to each input record
      Returns:
      this configurator
    • aggField

      public <T> AggregateAPI aggField(Field<T> field, Collector<? super Record,?,? extends T> collector)
      Defines (or redefines) the given field as an aggregate over input records, using the given collector.
      Type Parameters:
      T - the value type of the field
      Parameters:
      field - the field
      collector - the collector that describes the aggregate operation over input records
      Returns:
      this configurator
    • keyFields

      public AggregateAPI keyFields(Field<?>... fields)
      Defines (or redefines) each of the given fields as keys that look up the same field on each input record.
      Parameters:
      fields - the fields
      Returns:
      this configurator
      Throws:
      NoSuchElementException - if the stream header does not contain a given field
    • keys

      public AggregateAPI keys(Field<?>... fields)
      Defines keys from each of the given fields, as the lookup of the same field on each input record.
      Parameters:
      fields - the fields
      Returns:
      this configurator
      Throws:
      NoSuchElementException - if the stream header does not contain a given field
    • postField

      public <T> AggregateAPI postField(Field<T> field, Function<? super Record,? extends T> mapper)
      Defines (or redefines) the given field as the application of the given function to each output record. The field may reference other output fields on the same record, including other post-fields, as long as it does not reference itself (including transitively). If a reference cycle is detected during evaluation, an IllegalStateException will be thrown.
      Type Parameters:
      T - the value type of the field
      Parameters:
      field - the field
      mapper - a function to apply to each output record
      Returns:
      this configurator
    • keys

      public <T> AggregateAPI keys(Function<? super Record,? extends T> mapper, Consumer<AggregateAPI.Keys<T>> config)
      Configures a sub-configurator, that may define (or redefine) keys in terms of the result of applying the given function to each input record. If no keys are defined by the sub-configurator, possibly due to later field redefinitions, the entire sub-configurator is discarded.
      Type Parameters:
      T - the result type of the function
      Parameters:
      mapper - a function to be applied to each input record
      config - a consumer of the sub-configurator
      Returns:
      this configurator
    • aggs

      public <T> AggregateAPI aggs(Collector<? super Record,?,? extends T> collector, Consumer<AggregateAPI.Aggs<T>> config)
      Configures a sub-configurator, that may define (or redefine) fields in terms of the result of aggregating the input records using the given collector. If no fields are defined by the sub-configurator, possibly due to later field redefinitions, the entire sub-configurator is discarded.
      Type Parameters:
      T - the result type of the aggregation
      Parameters:
      collector - the collector that describes the aggregate operation over input records
      config - a consumer of the sub-configurator
      Returns:
      this configurator
    • route

      public <T> AggregateAPI route(BiConsumer<? super Record,? super BiConsumer<Field<?>,T>> router, Consumer<AggregateAPI.Route<T>> config)
      Configures a sub-configurator, that may define (or redefine) fields as aggregates over values emitted to those fields by the given routing function. If no fields are defined by the sub-configurator, possibly due to later field redefinitions, the entire sub-configurator is discarded.

      If the routing function emits to a field that is not defined by the sub-configurator, there is no effect. If the routing function emits multiple values to the same field, the field aggregates each value.

      Routing is more general than, but can efficiently express, a SQL "pivot" operation. SQL pivot syntax is dialect-specific and not universally supported, but an example query might look something like:

      
           SELECT * FROM (
               SELECT VendorId, EmployeeId, PurchaseAmount
               FROM Purchases
           )
           PIVOT (
               AVG(PurchaseAmount)
               FOR EmployeeId IN (250 Emp1, 251 Emp2, 256 Emp3, 257 Emp4, 260 Emp5)
           )
       

      Analogous Java code (with no attempt to remove duplication) might look like:

      
           RecordStream stream = purchasesStream
               .aggregate(aggregate -> aggregate
                   .keyField(vendorId)
                   .<Long>route(
                       (record, sink) -> {
                           Long amount = record.get(purchaseAmount);
                           switch (record.get(employeeId)) {
                               case 250: sink.accept(emp1, amount); break;
                               case 251: sink.accept(emp2, amount); break;
                               case 256: sink.accept(emp3, amount); break;
                               case 257: sink.accept(emp4, amount); break;
                               case 260: sink.accept(emp5, amount); break;
                           }
                       },
                       route -> route
                           .aggField(emp1, Collectors.averagingLong(amount -> amount))
                           .aggField(emp2, Collectors.averagingLong(amount -> amount))
                           .aggField(emp3, Collectors.averagingLong(amount -> amount))
                           .aggField(emp4, Collectors.averagingLong(amount -> amount))
                           .aggField(emp5, Collectors.averagingLong(amount -> amount))
                   )
               );
       

      Or, with some refactoring:

      
           Map<Integer, Field<Double>> empById = Map.of(250, emp1, 251, emp2, 256, emp3, 257, emp4, 260, emp5);
      
           RecordStream stream = purchasesStream
               .aggregate(aggregate -> aggregate
                   .keyField(vendorId)
                   .<Long>route(
                       (record, sink) -> sink.accept(empById.get(record.get(employeeId)), record.get(purchaseAmount)),
                       route -> empById.forEach((id, emp) -> route.aggField(emp, Collectors.averagingLong(amount -> amount)))
                   )
               );
       
      Type Parameters:
      T - the type of values emitted by the routing function
      Parameters:
      router - the routing function
      config - a consumer of the sub-configurator
      Returns:
      this configurator