Class AggregateAPI
record-stream.
An aggregate operation may define zero or more keys, that together group the input stream into non-overlapping partitions. The keys may be retained as output fields, or discarded. Each partition will capture one or more input records, and produce exactly one output record. In the special case that no keys are used and the stream is empty, the default partition will capture no input records, but will still produce exactly one output record.
An aggregate operation may define fields as aggregate functions. An aggregate function is applied per-partition, and returns one value for the partition, which becomes the value of the field on the output record for that partition.
Fields defined on the configurator (or any sub-configurators) become fields on a resultant header.
The header fields are in order of definition on the configurator(s). A field may be redefined on the configurator(s),
in which case its definition is replaced, but its original position in the header is retained.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionclassA sub-configurator used to define fields of an aggregate operation that depend on a common intermediate aggregation result.classA sub-configurator used to define keys of an aggregate operation that depend on a common intermediate result.classA sub-configurator used to define fields of an aggregate operation that aggregate over values emitted to them by a shared routing function. -
Method Summary
Modifier and TypeMethodDescription<T> AggregateAPIDefines (or redefines) the given field as an aggregate over input records, using the given collector.<T> AggregateAPIaggs(Collector<? super Record, ?, ? extends T> collector, Consumer<AggregateAPI.Aggs<T>> config) Configures a sub-configurator, that may define (or redefine) fields in terms of the result of aggregating the input records using the given collector.Defines a key as the lookup of the given field on each input record.Defines a key as the application of the given function to each input record.Defines (or redefines) the given field as a key that looks up the same field on each input record.<T> AggregateAPIDefines (or redefines) the given field as a key that applies the given function to each input record.Defines (or redefines) each of the given fields as keys that look up the same field on each input record.Defines keys from each of the given fields, as the lookup of the same field on each input record.<T> AggregateAPIkeys(Function<? super Record, ? extends T> mapper, Consumer<AggregateAPI.Keys<T>> config) Configures a sub-configurator, that may define (or redefine) keys in terms of the result of applying the given function to each input record.<T> AggregateAPIDefines (or redefines) the given field as the application of the given function to each output record.<T> AggregateAPIroute(BiConsumer<? super Record, ? super BiConsumer<Field<?>, T>> router, Consumer<AggregateAPI.Route<T>> config) Configures a sub-configurator, that may define (or redefine) fields as aggregates over values emitted to those fields by the given routing function.
-
Method Details
-
key
Defines a key as the lookup of the given field on each input record.- Parameters:
field- the field- Returns:
- this configurator
- Throws:
NoSuchElementException- if the stream header does not contain the given field
-
key
Defines a key as the application of the given function to each input record.- Parameters:
mapper- a function to be applied to each input record- Returns:
- this configurator
-
keyField
Defines (or redefines) the given field as a key that looks up the same field on each input record.- Parameters:
field- the field- Returns:
- this configurator
- Throws:
NoSuchElementException- if the stream header does not contain the given field
-
keyField
Defines (or redefines) the given field as a key that applies the given function to each input record.- Type Parameters:
T- the value type of the field- Parameters:
field- the fieldmapper- a function to be applied to each input record- Returns:
- this configurator
-
aggField
Defines (or redefines) the given field as an aggregate over input records, using the given collector.- Type Parameters:
T- the value type of the field- Parameters:
field- the fieldcollector- the collector that describes the aggregate operation over input records- Returns:
- this configurator
-
keyFields
Defines (or redefines) each of the given fields as keys that look up the same field on each input record.- Parameters:
fields- the fields- Returns:
- this configurator
- Throws:
NoSuchElementException- if the stream header does not contain a given field
-
keys
Defines keys from each of the given fields, as the lookup of the same field on each input record.- Parameters:
fields- the fields- Returns:
- this configurator
- Throws:
NoSuchElementException- if the stream header does not contain a given field
-
postField
Defines (or redefines) the given field as the application of the given function to each output record. The field may reference other output fields on the same record, including other post-fields, as long as it does not reference itself (including transitively). If a reference cycle is detected during evaluation, anIllegalStateExceptionwill be thrown.- Type Parameters:
T- the value type of the field- Parameters:
field- the fieldmapper- a function to apply to each output record- Returns:
- this configurator
-
keys
public <T> AggregateAPI keys(Function<? super Record, ? extends T> mapper, Consumer<AggregateAPI.Keys<T>> config) Configures a sub-configurator, that may define (or redefine) keys in terms of the result of applying the given function to each input record. If no keys are defined by the sub-configurator, possibly due to later field redefinitions, the entire sub-configurator is discarded.- Type Parameters:
T- the result type of the function- Parameters:
mapper- a function to be applied to each input recordconfig- a consumer of the sub-configurator- Returns:
- this configurator
-
aggs
public <T> AggregateAPI aggs(Collector<? super Record, ?, ? extends T> collector, Consumer<AggregateAPI.Aggs<T>> config) Configures a sub-configurator, that may define (or redefine) fields in terms of the result of aggregating the input records using the given collector. If no fields are defined by the sub-configurator, possibly due to later field redefinitions, the entire sub-configurator is discarded.- Type Parameters:
T- the result type of the aggregation- Parameters:
collector- the collector that describes the aggregate operation over input recordsconfig- a consumer of the sub-configurator- Returns:
- this configurator
-
route
public <T> AggregateAPI route(BiConsumer<? super Record, ? super BiConsumer<Field<?>, T>> router, Consumer<AggregateAPI.Route<T>> config) Configures a sub-configurator, that may define (or redefine) fields as aggregates over values emitted to those fields by the given routing function. If no fields are defined by the sub-configurator, possibly due to later field redefinitions, the entire sub-configurator is discarded.If the routing function emits to a field that is not defined by the sub-configurator, there is no effect. If the routing function emits multiple values to the same field, the field aggregates each value.
Routing is more general than, but can efficiently express, a SQL "pivot" operation. SQL pivot syntax is dialect-specific and not universally supported, but an example query might look something like:
SELECT * FROM ( SELECT VendorId, EmployeeId, PurchaseAmount FROM Purchases ) PIVOT ( AVG(PurchaseAmount) FOR EmployeeId IN (250 Emp1, 251 Emp2, 256 Emp3, 257 Emp4, 260 Emp5) )Analogous Java code (with no attempt to remove duplication) might look like:
RecordStream stream = purchasesStream .aggregate(aggregate -> aggregate .keyField(vendorId) .<Long>route( (record, sink) -> { Long amount = record.get(purchaseAmount); switch (record.get(employeeId)) { case 250: sink.accept(emp1, amount); break; case 251: sink.accept(emp2, amount); break; case 256: sink.accept(emp3, amount); break; case 257: sink.accept(emp4, amount); break; case 260: sink.accept(emp5, amount); break; } }, route -> route .aggField(emp1, Collectors.averagingLong(amount -> amount)) .aggField(emp2, Collectors.averagingLong(amount -> amount)) .aggField(emp3, Collectors.averagingLong(amount -> amount)) .aggField(emp4, Collectors.averagingLong(amount -> amount)) .aggField(emp5, Collectors.averagingLong(amount -> amount)) ) );Or, with some refactoring:
Map<Integer, Field<Double>> empById = Map.of(250, emp1, 251, emp2, 256, emp3, 257, emp4, 260, emp5); RecordStream stream = purchasesStream .aggregate(aggregate -> aggregate .keyField(vendorId) .<Long>route( (record, sink) -> sink.accept(empById.get(record.get(employeeId)), record.get(purchaseAmount)), route -> empById.forEach((id, emp) -> route.aggField(emp, Collectors.averagingLong(amount -> amount))) ) );- Type Parameters:
T- the type of values emitted by the routing function- Parameters:
router- the routing functionconfig- a consumer of the sub-configurator- Returns:
- this configurator
-