1414 * limitations under the License.
1515 */
1616
17- package sleeper .systemtest .drivers .statemachine ;
17+ package sleeper .systemtest .drivers .ingest ;
1818
19+ import io .fabric8 .kubernetes .api .model .Pod ;
20+ import io .fabric8 .kubernetes .api .model .PodList ;
21+ import io .fabric8 .kubernetes .api .model .batch .v1 .Job ;
22+ import io .fabric8 .kubernetes .api .model .batch .v1 .JobList ;
23+ import io .fabric8 .kubernetes .client .KubernetesClient ;
24+ import org .slf4j .Logger ;
25+ import org .slf4j .LoggerFactory ;
1926import software .amazon .awssdk .services .sfn .SfnClient ;
2027import software .amazon .awssdk .services .sfn .model .DescribeExecutionResponse ;
2128
2229import sleeper .bulkimport .core .statemachine .DeriveJobExecutionName ;
30+ import sleeper .core .properties .instance .InstanceProperties ;
2331import sleeper .systemtest .drivers .util .SystemTestClients ;
2432import sleeper .systemtest .dsl .ingest .EksBulkImportDriver ;
2533import sleeper .systemtest .dsl .ingest .SentIngestJobsContext ;
2634import sleeper .systemtest .dsl .instance .SystemTestInstanceContext ;
2735
2836import java .util .List ;
2937
38+ import static sleeper .core .properties .instance .CdkDefinedInstanceProperty .BULK_IMPORT_EKS_NAMESPACE ;
3039import static sleeper .core .properties .instance .CdkDefinedInstanceProperty .BULK_IMPORT_EKS_STATE_MACHINE_ARN ;
3140import static sleeper .core .properties .table .TableProperty .TABLE_ID ;
3241
3544 * table IDs using DeriveJobExecutionName, then calls the Step Functions API to retrieve each execution's status.
3645 */
3746public class AwsEksBulkImportDriver implements EksBulkImportDriver {
47+ private static final Logger LOGGER = LoggerFactory .getLogger (AwsEksBulkImportDriver .class );
48+
3849 private final SystemTestInstanceContext instance ;
3950 private final SentIngestJobsContext sentJobs ;
4051 private final SfnClient sfnClient ;
52+ private final KubernetesClientProvider k8sProvider ;
4153
4254 public AwsEksBulkImportDriver (SystemTestInstanceContext instance , SentIngestJobsContext sentJobs , SystemTestClients clients ) {
55+ this (instance , sentJobs , clients .getSfn (), clients ::createKubernetesClient );
56+ }
57+
58+ public AwsEksBulkImportDriver (SystemTestInstanceContext instance , SentIngestJobsContext sentJobs , SfnClient sfnClient , KubernetesClientProvider k8sProvider ) {
4359 this .instance = instance ;
4460 this .sentJobs = sentJobs ;
45- this .sfnClient = clients .getSfn ();
61+ this .sfnClient = sfnClient ;
62+ this .k8sProvider = k8sProvider ;
4663 }
4764
4865 @ Override
@@ -54,7 +71,36 @@ public List<String> getExecutionStatuses() {
5471 String executionName = DeriveJobExecutionName .jobExecutionName (tableId , jobId );
5572 String executionArn = stateMachineArn .replace (":stateMachine:" , ":execution:" ) + ":" + executionName ;
5673 DescribeExecutionResponse response = sfnClient .describeExecution (req -> req .executionArn (executionArn ));
74+ LOGGER .info ("Found execution for job {}: {}" , jobId , response );
75+ if (response .error () != null ) {
76+ LOGGER .info ("Error: {}" , response .error ());
77+ LOGGER .info ("Cause: {}" , response .cause ());
78+ }
5779 return response .statusAsString ();
5880 }).toList ();
5981 }
82+
83+ @ Override
84+ public List <String > getPods () {
85+ InstanceProperties properties = instance .getInstanceProperties ();
86+ PodList list = k8sProvider .getClient (properties ).pods ()
87+ .inNamespace (properties .get (BULK_IMPORT_EKS_NAMESPACE ))
88+ .list ();
89+ LOGGER .info ("Found pods in Spark namespace: {}" , list );
90+ return list .getItems ().stream ().map (Pod ::toString ).toList ();
91+ }
92+
93+ @ Override
94+ public List <String > getJobs () {
95+ InstanceProperties properties = instance .getInstanceProperties ();
96+ JobList list = k8sProvider .getClient (properties ).batch ().v1 ().jobs ()
97+ .inNamespace (properties .get (BULK_IMPORT_EKS_NAMESPACE ))
98+ .list ();
99+ LOGGER .info ("Found jobs in Spark namespace: {}" , list );
100+ return list .getItems ().stream ().map (Job ::toString ).toList ();
101+ }
102+
103+ public interface KubernetesClientProvider {
104+ KubernetesClient getClient (InstanceProperties instanceProperties );
105+ }
60106}
0 commit comments