Skip to content

Commit edb042d

Browse files
authored
Collecting pipeline statistics (#1556)
* Fixed bug related to collecting datetime statistics * Updated dsl definitions * Fixed failing tests
1 parent 158e344 commit edb042d

12 files changed

Lines changed: 244 additions & 31 deletions

File tree

src/cli/src/Flow/CLI/Command/FileAnalyzeCommand.php

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
namespace Flow\CLI\Command;
66

7-
use function Flow\CLI\{option_int, option_int_nullable};
8-
use function Flow\ETL\DSL\{df};
7+
use function Flow\CLI\{option_bool, option_int, option_int_nullable};
8+
use function Flow\ETL\DSL\{analyze, df};
99
use Flow\CLI\Arguments\{FilePathArgument};
1010
use Flow\CLI\Command\Traits\{CSVOptions, ConfigOptions, JSONOptions, ParquetOptions, StatisticsOptions, XMLOptions};
1111
use Flow\CLI\Factory\ExtractorFactory;
@@ -82,11 +82,21 @@ protected function execute(InputInterface $input, OutputInterface $output) : int
8282
$progress = $style->createProgressBar();
8383
$progress->setFormat('Analyzed Rows: %current% %bar%');
8484

85+
$analyze = analyze();
86+
87+
if (option_bool('stats-schema', $input)) {
88+
$analyze->withSchema();
89+
}
90+
91+
if (option_bool('stats-columns', $input)) {
92+
$analyze->withColumnStatistics();
93+
}
94+
8595
$report = $df->run(
8696
static function (Rows $rows) use ($progress) : void {
8797
$progress->advance($rows->count());
8898
},
89-
analyze: true
99+
analyze: $analyze
90100
);
91101

92102
if ($report === null) {

src/cli/src/Flow/CLI/Command/PipelineRunCommand.php

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
namespace Flow\CLI\Command;
66

77
use function Flow\CLI\option_bool;
8+
use function Flow\ETL\DSL\analyze;
89
use Flow\CLI\Arguments\FilePathArgument;
910
use Flow\CLI\Command\Traits\{ConfigOptions, StatisticsOptions};
1011
use Flow\CLI\Formatter\PipelineReportFormatter;
@@ -60,12 +61,22 @@ public function execute(InputInterface $input, OutputInterface $output) : int
6061
{
6162
$style = new SymfonyStyle($input, $output);
6263

64+
$analyze = option_bool('analyze', $input) ? analyze() : false;
65+
66+
if ($analyze && option_bool('stats-schema', $input)) {
67+
$analyze->withSchema();
68+
}
69+
70+
if ($analyze && option_bool('stats-columns', $input)) {
71+
$analyze->withColumnStatistics();
72+
}
73+
6374
try {
6475
ob_start();
6576
$df = match ($this->pipelinePath->extension()) {
6677
'php' => (new PipelineFactory($this->pipelinePath))->fromPHP(),
6778
};
68-
$report = $df->run(analyze: option_bool('analyze', $input));
79+
$report = $df->run(analyze: $analyze);
6980

7081
$style->writeln(ob_get_clean());
7182

src/cli/src/Flow/CLI/Formatter/PipelineReportFormatter.php

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ public function __construct(private Report $report, private SymfonyStyle $style,
1919

2020
public function format() : void
2121
{
22-
if (option_bool('stats-schema', $this->input)) {
22+
$schema = $this->report->schema();
23+
24+
if (option_bool('stats-schema', $this->input) && $schema) {
2325
$this->style->newLine();
2426
$this->style->section('Schema');
2527

2628
$normalizedSchema = [];
2729

28-
foreach ($this->report->schema()->definitions() as $definition) {
30+
foreach ($schema->definitions() as $definition) {
2931
$normalizedSchema[] = [
3032
'name' => $definition->entry()->name(),
3133
'type' => $definition->type()->toString(),
@@ -41,12 +43,14 @@ public function format() : void
4143
->render();
4244
}
4345

44-
if (option_bool('stats-columns', $this->input)) {
46+
$columnsStatistics = $this->report->statistics()->columns;
47+
48+
if (option_bool('stats-columns', $this->input) && $columnsStatistics) {
4549
$normalizedColumnStatistics = [];
4650

4751
$valueFormatter = new ValueFormatter();
4852

49-
foreach ($this->report->statistics()->columns->all() as $columnStatistics) {
53+
foreach ($columnsStatistics->all() as $columnStatistics) {
5054
$normalizedColumnStatistics[] = [
5155
'name' => $columnStatistics->name(),
5256
'type' => $columnStatistics->type()->toString(),
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Flow\ETL;
6+
7+
final class Analyze
8+
{
9+
private bool $collectColumnStatistics = false;
10+
11+
private bool $collectSchema = false;
12+
13+
public function __construct()
14+
{
15+
}
16+
17+
public function collectColumnStatistics() : bool
18+
{
19+
return $this->collectColumnStatistics;
20+
}
21+
22+
public function collectSchema() : bool
23+
{
24+
return $this->collectSchema;
25+
}
26+
27+
public function withColumnStatistics() : self
28+
{
29+
$this->collectColumnStatistics = true;
30+
31+
return $this;
32+
}
33+
34+
public function withSchema() : self
35+
{
36+
$this->collectSchema = true;
37+
38+
return $this;
39+
}
40+
}

src/core/etl/src/Flow/ETL/DSL/functions.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@
104104
use Flow\ETL\Row\Schema\Formatter\ASCIISchemaFormatter;
105105
use Flow\ETL\Row\Schema\{Definition, Matcher\EvolvingSchemaMatcher, Matcher\StrictSchemaMatcher, SchemaFormatter};
106106
use Flow\ETL\Row\{Entry, EntryReference, Reference, References, Schema};
107-
use Flow\ETL\{Attribute\DocumentationDSL,
107+
use Flow\ETL\{Analyze,
108+
Attribute\DocumentationDSL,
108109
Attribute\DocumentationExample,
109110
Attribute\Module,
110111
Attribute\Type as DSLType,
@@ -1712,3 +1713,9 @@ function constraint_unique(string $reference, string ...$references) : UniqueCon
17121713
{
17131714
return new UniqueConstraint($reference, ...$references);
17141715
}
1716+
1717+
#[DocumentationDSL(module: Module::CORE, type: DSLType::HELPER)]
1718+
function analyze() : Analyze
1719+
{
1720+
return new Analyze();
1721+
}

src/core/etl/src/Flow/ETL/DataFrame.php

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
namespace Flow\ETL;
66

7-
use function Flow\ETL\DSL\{refs, to_output};
7+
use function Flow\ETL\DSL\{analyze, refs, to_output};
88
use Flow\ETL\DataFrame\GroupedDataFrame;
99
use Flow\ETL\Dataset\{Report, Statistics};
1010
use Flow\ETL\Exception\{InvalidArgumentException, RuntimeException};
@@ -692,20 +692,29 @@ public function rows(Transformer|Transformation $transformer) : self
692692
/**
693693
* @trigger
694694
*
695+
* When analyzing pipeline execution we can chose to collect various metrics through analyze()->with*() method
696+
*
697+
* - column statistics - analyze()->withColumnStatistics()
698+
* - schema - analyze()->withSchema()
699+
*
695700
* @param null|callable(Rows $rows, FlowContext $context): void $callback
696-
* @param bool $analyze - when set to true, run will return Report
701+
* @param Analyze|bool $analyze - when set run will return Report
702+
*
703+
* @return ($analyze is Analyze|true ? Report : null)
697704
*/
698-
public function run(?callable $callback = null, bool $analyze = false) : ?Report
705+
public function run(?callable $callback = null, bool|Analyze $analyze = false) : ?Report
699706
{
700707
$clone = clone $this;
701708

702709
$totalRows = 0;
703-
$schema = new Schema();
710+
711+
$analyze = $analyze === true ? analyze() : $analyze;
704712

705713
if ($analyze) {
706714
$startedAt = $this->context->config->clock()->now();
707715
$startTime = Statistics\HighResolutionTime::now();
708-
$columnStatistics = new Statistics\Columns();
716+
$columnStatistics = $analyze->collectColumnStatistics() ? new Statistics\Columns() : null;
717+
$schema = $analyze->collectSchema() ? new Schema() : null;
709718
}
710719

711720
foreach ($clone->pipeline->process($clone->context) as $rows) {
@@ -714,12 +723,17 @@ public function run(?callable $callback = null, bool $analyze = false) : ?Report
714723
}
715724

716725
if ($analyze) {
717-
$schema = $schema->merge($rows->schema());
718726
$totalRows += $rows->count();
719727

720-
foreach ($rows->all() as $row) {
721-
foreach ($row->entries()->all() as $entry) {
722-
$columnStatistics->add($entry);
728+
if ($schema !== null) {
729+
$schema = $schema->merge($rows->schema());
730+
}
731+
732+
if ($columnStatistics !== null) {
733+
foreach ($rows->all() as $row) {
734+
foreach ($row->entries()->all() as $entry) {
735+
$columnStatistics->add($entry);
736+
}
723737
}
724738
}
725739
}

src/core/etl/src/Flow/ETL/Dataset/Report.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@
99
final readonly class Report
1010
{
1111
public function __construct(
12-
private Schema $schema,
12+
private ?Schema $schema,
1313
private Statistics $statistics,
1414
) {
1515

1616
}
1717

18-
public function schema() : Schema
18+
public function schema() : ?Schema
1919
{
2020
return $this->schema;
2121
}

src/core/etl/src/Flow/ETL/Dataset/Statistics.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
public function __construct(
1212
private int $totalRows,
1313
public ExecutionTime $executionTime,
14-
public Columns $columns,
14+
public ?Columns $columns,
1515
) {
1616
}
1717

src/core/etl/src/Flow/ETL/Dataset/Statistics/Column.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,14 @@ public function calculate(Entry $entry) : void
8484
return;
8585
}
8686

87-
if ($entry instanceof Entry\DateTimeEntry || $entry instanceof Entry\DateEntry || $entry instanceof Entry\IntegerEntry || $entry instanceof Entry\FloatEntry || $entry instanceof Entry\BooleanEntry) {
87+
if ($entry instanceof Entry\DateEntry || $entry instanceof Entry\DateTimeEntry) {
88+
$this->max = \max($this->max ?? $value, $value);
89+
$this->min = \min($this->min ?? $value, $value);
90+
91+
return;
92+
}
93+
94+
if ($entry instanceof Entry\IntegerEntry || $entry instanceof Entry\FloatEntry || $entry instanceof Entry\BooleanEntry) {
8895
$this->min = \min($this->min ?? $value, $value);
8996
$this->max = \max($this->max ?? $value, $value);
9097
}

0 commit comments

Comments
 (0)