Skip to content

Commit cc4fb7b

Browse files
authored
feat(Provider): priority destination and connection connectors (#6)
Added Connectors: - Destination Connector: Mysql - Connection Connecor
1 parent f9b5d23 commit cc4fb7b

25 files changed

Lines changed: 1050 additions & 270 deletions

api/connection.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package api
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
)
7+
8+
type ConnectionResourceID struct {
9+
ConnecctionID string `json:"connectionId"`
10+
}
11+
12+
type ConnectionResource struct {
13+
Name string `json:"name"`
14+
SourceID string `json:"sourceId,omitempty"`
15+
DestinationID string `json:"destinationId,omitempty"`
16+
ConnectionID string `json:"connectionId,omitempty"`
17+
// SyncCatalog interface{} `json:"syncCatalog"`
18+
DataResidency string `json:"dataResidency,omitempty"`
19+
NamespaceDefinition string `json:"namespaceDefinition,omitempty"`
20+
NamespaceFormat string `json:"namespaceFormat,omitempty"`
21+
NonBreakingSchemaUpdatesBehavior string `json:"nonBreakingSchemaUpdatesBehavior,omitempty"`
22+
Prefix string `json:"prefix,omitempty"`
23+
Status string `json:"status,omitempty"`
24+
Schedule ConnScheduleData `json:"schedule"`
25+
//OperatorConfiguration connOperatorConfig `json:"operator_configuration"`
26+
}
27+
type ConnScheduleData struct {
28+
ScheduleType string `json:"scheduleType"`
29+
CronExpression string `json:"cronExpression,omitempty"`
30+
}
31+
32+
type DiscoverSourceSchemaCatalog struct {
33+
SourceID string `json:"sourceId"`
34+
DisableCache bool `json:"disable_cache"`
35+
}
36+
37+
func (c *Client) CreateConnectionResource(payload ConnectionResource) (ConnectionResource, error) {
38+
// logger := fwhelpers.GetLogger()
39+
40+
method := "POST"
41+
url := c.Host + "/v1/connections"
42+
body, err := json.Marshal(payload)
43+
if err != nil {
44+
return ConnectionResource{}, err
45+
}
46+
47+
b, statusCode, _, _, err := c.doRequest(method, url, body, nil)
48+
if err != nil {
49+
return ConnectionResource{}, err
50+
}
51+
52+
connection := ConnectionResource{}
53+
if statusCode >= 200 && statusCode <= 299 {
54+
err = json.Unmarshal(b, &connection)
55+
return connection, err
56+
} else {
57+
msg, err := c.getAPIError(b)
58+
if err != nil {
59+
return connection, err
60+
} else {
61+
return connection, fmt.Errorf(msg)
62+
}
63+
}
64+
}
65+
66+
func (c *Client) ReadConnectionResource(connectionId string) (ConnectionResource, error) {
67+
// logger := fwhelpers.GetLogger()
68+
69+
method := "GET"
70+
url := c.Host + "/v1/connections/" + connectionId
71+
72+
b, statusCode, _, _, err := c.doRequest(method, url, []byte{}, nil)
73+
if err != nil {
74+
return ConnectionResource{}, err
75+
}
76+
connection := ConnectionResource{}
77+
if statusCode >= 200 && statusCode <= 299 {
78+
err = json.Unmarshal(b, &connection)
79+
return connection, err
80+
} else {
81+
msg, err := c.getAPIError(b)
82+
if err != nil {
83+
return connection, err
84+
} else {
85+
return connection, fmt.Errorf(msg)
86+
}
87+
}
88+
}
89+
90+
func (c *Client) UpdateConnectionResource(payload ConnectionResource) (ConnectionResource, error) {
91+
// logger := fwhelpers.GetLogger()
92+
93+
return ConnectionResource{}, fmt.Errorf("update resource is not supported")
94+
}
95+
96+
func (c *Client) DeleteConnectionResource(connectionId string) error {
97+
// logger := fwhelpers.GetLogger()
98+
99+
method := "DELETE"
100+
url := c.Host + "/v1/connections/" + connectionId
101+
102+
b, statusCode, _, _, err := c.doRequest(method, url, []byte{}, nil)
103+
if err != nil {
104+
return err
105+
}
106+
107+
if statusCode >= 200 && statusCode <= 299 {
108+
return nil
109+
} else {
110+
msg, err := c.getAPIError(b)
111+
if err != nil {
112+
return err
113+
} else {
114+
return fmt.Errorf(msg)
115+
}
116+
}
117+
}

api/destination_mysql.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package api
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
)
7+
8+
type DestinationMysqlID struct {
9+
DestinationId string `json:"destinationId"`
10+
}
11+
12+
type DestinationMysql struct {
13+
Name string `json:"name"`
14+
DestinationId string `json:"destinationId,omitempty"`
15+
WorkspaceId string `json:"workspaceId"`
16+
ConnectionConfiguration DestinationMysqlConnConfig `json:"configuration"`
17+
}
18+
19+
type DestinationMysqlConnConfig struct {
20+
DestinationType string `json:"destinationType"`
21+
Host string `json:"host"`
22+
Username string `json:"username"`
23+
Password string `json:"password"`
24+
Database string `json:"database"`
25+
Port int `json:"port"`
26+
}
27+
28+
func (c *Client) CreateMysqlDestination(payload DestinationMysql) (DestinationMysql, error) {
29+
// logger := fwhelpers.GetLogger()
30+
method := "POST"
31+
url := c.Host + "/v1/destinations"
32+
body, err := json.Marshal(payload)
33+
if err != nil {
34+
return DestinationMysql{}, err
35+
}
36+
b, statusCode, _, _, err := c.doRequest(method, url, body, nil)
37+
if err != nil {
38+
return DestinationMysql{}, err
39+
}
40+
41+
destination := DestinationMysql{}
42+
43+
if statusCode >= 200 && statusCode <= 299 {
44+
err = json.Unmarshal(b, &destination)
45+
return destination, err
46+
} else {
47+
msg, err := c.getAPIError(b)
48+
if err != nil {
49+
return destination, err
50+
} else {
51+
return destination, fmt.Errorf(msg)
52+
}
53+
}
54+
}
55+
56+
func (c *Client) ReadMysqlDestination(destinationId string) (DestinationMysql, error) {
57+
// logger := fwhelpers.GetLogger()
58+
59+
method := "GET"
60+
url := c.Host + "/v1/destinations/" + destinationId
61+
62+
b, statusCode, _, _, err := c.doRequest(method, url, []byte{}, nil)
63+
if err != nil {
64+
return DestinationMysql{}, err
65+
}
66+
67+
destination := DestinationMysql{}
68+
if statusCode >= 200 && statusCode <= 299 {
69+
err = json.Unmarshal(b, &destination)
70+
return destination, err
71+
} else {
72+
msg, err := c.getAPIError(b)
73+
if err != nil {
74+
return destination, err
75+
} else {
76+
return destination, fmt.Errorf(msg)
77+
}
78+
}
79+
}
80+
81+
func (c *Client) UpdateMysqlDestination(payload DestinationMysql) (DestinationMysql, error) {
82+
// logger := fwhelpers.GetLogger()
83+
84+
return DestinationMysql{}, fmt.Errorf("update resource is not supported")
85+
}
86+
87+
func (c *Client) DeleteMysqlDestination(destinationId string) error {
88+
// logger := fwhelpers.GetLogger()
89+
90+
method := "DELETE"
91+
url := c.Host + "/v1/destinations/" + destinationId
92+
sId := DestinationMysqlID{destinationId}
93+
body, err := json.Marshal(sId)
94+
if err != nil {
95+
return err
96+
}
97+
98+
b, statusCode, _, _, err := c.doRequest(method, url, body, nil)
99+
if err != nil {
100+
return err
101+
}
102+
103+
if statusCode >= 200 && statusCode <= 299 {
104+
return nil
105+
} else {
106+
msg, err := c.getAPIError(b)
107+
if err != nil {
108+
return err
109+
} else {
110+
return fmt.Errorf(msg)
111+
}
112+
}
113+
}

api/source_amplitude.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package api
33
import (
44
"encoding/json"
55
"fmt"
6-
7-
"github.com/zipstack/pct-plugin-framework/fwhelpers"
86
)
97

108
type SourceAmplitudeID struct {
@@ -81,10 +79,9 @@ func (c *Client) ReadAmplitudeSource(sourceId string) (SourceAmplitude, error) {
8179
}
8280

8381
func (c *Client) UpdateAmplitudeSource(payload SourceAmplitude) (SourceAmplitude, error) {
84-
logger := fwhelpers.GetLogger()
82+
// logger := fwhelpers.GetLogger()
8583

86-
logger.Print("[yellow]Update api is not yet exposed from Airbyte-Cloud[reset]")
87-
return SourceAmplitude{}, nil
84+
return SourceAmplitude{}, fmt.Errorf("update resource is not supported")
8885
}
8986

9087
func (c *Client) DeleteAmplitudeSource(sourceId string) error {

api/source_facebook_marketing.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package api
33
import (
44
"encoding/json"
55
"fmt"
6-
7-
"github.com/zipstack/pct-plugin-framework/fwhelpers"
86
)
97

108
type SourceFacebookMarketingID struct {
@@ -89,10 +87,9 @@ func (c *Client) ReadFacebookMarketingSource(sourceId string) (SourceFacebookMar
8987
}
9088

9189
func (c *Client) UpdateFacebookMarketingSource(payload SourceFacebookMarketing) (SourceFacebookMarketing, error) {
92-
logger := fwhelpers.GetLogger()
90+
// logger := fwhelpers.GetLogger()
9391

94-
logger.Print("[yellow]Update api is not yet exposed from Airbyte-Cloud[reset]")
95-
return SourceFacebookMarketing{}, nil
92+
return SourceFacebookMarketing{}, fmt.Errorf("update resource is not supported")
9693
}
9794

9895
func (c *Client) DeleteFacebookMarketingSource(sourceId string) error {

api/source_freshdesk.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package api
33
import (
44
"encoding/json"
55
"fmt"
6-
7-
"github.com/zipstack/pct-plugin-framework/fwhelpers"
86
)
97

108
type SourceFreshdeskID struct {
@@ -80,10 +78,8 @@ func (c *Client) ReadFreshdeskSource(sourceId string) (SourceFreshdesk, error) {
8078
}
8179

8280
func (c *Client) UpdateFreshdeskSource(payload SourceFreshdesk) (SourceFreshdesk, error) {
83-
logger := fwhelpers.GetLogger()
84-
85-
logger.Print("[yellow]Update api is not yet exposed from Airbyte-Cloud[reset]")
86-
return SourceFreshdesk{}, nil
81+
// logger := fwhelpers.GetLogger()
82+
return SourceFreshdesk{}, fmt.Errorf("update resource is not supported")
8783
}
8884

8985
func (c *Client) DeleteFreshdeskSource(sourceId string) error {

api/source_google_analytics_v4.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package api
33
import (
44
"encoding/json"
55
"fmt"
6-
7-
"github.com/zipstack/pct-plugin-framework/fwhelpers"
86
)
97

108
type SourceGoogleAnalyticsV4ID struct {
@@ -85,10 +83,8 @@ func (c *Client) ReadGoogleAnalyticsV4Source(sourceId string) (SourceGoogleAnaly
8583
}
8684

8785
func (c *Client) UpdateGoogleAnalyticsV4Source(payload SourceGoogleAnalyticsV4) (SourceGoogleAnalyticsV4, error) {
88-
logger := fwhelpers.GetLogger()
89-
90-
logger.Print("[yellow]Update api is not yet exposed from Airbyte-Cloud[reset]")
91-
return SourceGoogleAnalyticsV4{}, nil
86+
// logger := fwhelpers.GetLogger()
87+
return SourceGoogleAnalyticsV4{}, fmt.Errorf("update resource is not supported")
9288
}
9389

9490
func (c *Client) DeleteGoogleAnalyticsV4Source(sourceId string) error {

api/source_google_sheets.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package api
33
import (
44
"encoding/json"
55
"fmt"
6-
7-
"github.com/zipstack/pct-plugin-framework/fwhelpers"
86
)
97

108
type SourceGoogleSheetsID struct {
@@ -83,10 +81,8 @@ func (c *Client) ReadGoogleSheetsSource(sourceId string) (SourceGoogleSheets, er
8381
}
8482

8583
func (c *Client) UpdateGoogleSheetsSource(payload SourceGoogleSheets) (SourceGoogleSheets, error) {
86-
logger := fwhelpers.GetLogger()
87-
88-
logger.Print("[yellow]Update api is not yet exposed from Airbyte-Cloud[reset]")
89-
return SourceGoogleSheets{}, nil
84+
// logger := fwhelpers.GetLogger()
85+
return SourceGoogleSheets{}, fmt.Errorf("update resource is not supported")
9086
}
9187

9288
func (c *Client) DeleteGoogleSheetsSource(sourceId string) error {

api/source_hubspot.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package api
33
import (
44
"encoding/json"
55
"fmt"
6-
7-
"github.com/zipstack/pct-plugin-framework/fwhelpers"
86
)
97

108
type SourceHubspotID struct {
@@ -82,10 +80,8 @@ func (c *Client) ReadHubspotSource(sourceId string) (SourceHubspot, error) {
8280
}
8381

8482
func (c *Client) UpdateHubspotSource(payload SourceHubspot) (SourceHubspot, error) {
85-
logger := fwhelpers.GetLogger()
86-
87-
logger.Print("[yellow]Update api is not yet exposed from Airbyte-Cloud[reset]")
88-
return SourceHubspot{}, nil
83+
// logger := fwhelpers.GetLogger()
84+
return SourceHubspot{}, fmt.Errorf("update resource is not supported")
8985
}
9086

9187
func (c *Client) DeleteHubspotSource(sourceId string) error {

api/source_pipedrive.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package api
33
import (
44
"encoding/json"
55
"fmt"
6-
7-
"github.com/zipstack/pct-plugin-framework/fwhelpers"
86
)
97

108
type SourcePipedriveID struct {
@@ -83,10 +81,8 @@ func (c *Client) ReadPipedriveSource(sourceId string) (SourcePipedrive, error) {
8381
}
8482

8583
func (c *Client) UpdatePipedriveSource(payload SourcePipedrive) (SourcePipedrive, error) {
86-
logger := fwhelpers.GetLogger()
87-
88-
logger.Print("[yellow]Update api is not yet exposed from Airbyte-Cloud[reset]")
89-
return SourcePipedrive{}, nil
84+
// logger := fwhelpers.GetLogger()
85+
return SourcePipedrive{}, fmt.Errorf("update resource is not supported")
9086
}
9187

9288
func (c *Client) DeletePipedriveSource(sourceId string) error {

0 commit comments

Comments
 (0)