Skip to content

Commit 4c6b103

Browse files
authored
Streaming package (#844)
1 parent 9b97ba4 commit 4c6b103

14 files changed

Lines changed: 558 additions & 65 deletions

Package.swift

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ let package = Package(
88
products: [
99
.library(name: "Split", targets: ["Split"]),
1010

11-
.library(name: "SplitCommons", targets: ["Logging", "Http", "BackoffCounter", "PeriodicRecorderWorker", "Tracker", "Concurrency"]),],
11+
.library(name: "SplitCommons", targets: ["Logging", "Http", "BackoffCounter", "PeriodicRecorderWorker", "Tracker", "Concurrency", "Streaming"]),],
1212
targets: [
1313

1414
// MARK: Split
1515
.target(
1616
name: "Split",
17-
dependencies: ["BackoffCounter", "Concurrency", "Http", "Logging", "PeriodicRecorderWorker", "Tracker"],
17+
dependencies: ["BackoffCounter", "Concurrency", "Http", "Logging", "PeriodicRecorderWorker", "Streaming", "Tracker"],
1818
path: "Split",
1919
exclude: [
2020
"Common/Yaml/LICENSE",
@@ -85,7 +85,7 @@ let package = Package(
8585
),
8686
.testTarget(
8787
name: "TrackerTests",
88-
dependencies: ["TrackerTests"],
88+
dependencies: ["Tracker"],
8989
path: "Sources/Tracker/Tests"
9090
),
9191

@@ -99,6 +99,17 @@ let package = Package(
9999
dependencies: ["Concurrency"],
100100
path: "Sources/Concurrency/Tests"
101101
),
102+
103+
.target(
104+
name: "Streaming",
105+
dependencies: ["Concurrency", "Http", "Logging"],
106+
exclude: ["Tests", "README.md"]
107+
),
108+
.testTarget(
109+
name: "StreamingTests",
110+
dependencies: ["Streaming"],
111+
path: "Sources/Streaming/Tests"
112+
),
102113
// #INJECT_TARGET
103114
]
104115
)

Split/Network/Streaming/EventStreamParser.swift renamed to Sources/Streaming/EventStreamParser.swift

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,17 @@
88

99
import Foundation
1010

11-
class EventStreamParser: @unchecked Sendable {
12-
static let kIdField = "id"
13-
static let kDataField = "data"
14-
static let kEventField = "event"
11+
public class EventStreamParser: @unchecked Sendable {
12+
public static let kIdField = "id"
13+
public static let kDataField = "data"
14+
public static let kEventField = "event"
1515
private static let kKeepAliveEvent = "keepalive"
1616
private static let kFieldSeparator: Character = ":"
1717
private static let kKeepAliveToken = "\(kFieldSeparator)\(kKeepAliveEvent)"
1818

19-
func parse(streamChunk: String) -> [String: String] {
19+
public init() {}
20+
21+
public func parse(streamChunk: String) -> [String: String] {
2022

2123
var messageValues = [String: String]()
2224
let messageLines = streamChunk.split(separator: "\n")
@@ -27,7 +29,7 @@ class EventStreamParser: @unchecked Sendable {
2729
return messageValues
2830
}
2931

30-
if trimmedLine.isEmpty() {
32+
if trimmedLine.isEmpty {
3133
return messageValues
3234
}
3335

@@ -47,7 +49,7 @@ class EventStreamParser: @unchecked Sendable {
4749
return messageValues
4850
}
4951

50-
func isKeepAlive(values: [String: String]) -> Bool {
52+
public func isKeepAlive(values: [String: String]) -> Bool {
5153
return values.contains { eventType, value in
5254
return eventType == Self.kEventField &&
5355
value.trimmingCharacters(in: .whitespacesAndNewlines).lowercased() == Self.kKeepAliveEvent

Sources/Streaming/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Streaming
Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,26 @@
77
//
88

99
import Foundation
10-
11-
struct SseClientConstants {
12-
static let pushNotificationChannelsParam = "channels"
13-
static let pushNotificationTokenParam = "accessToken"
14-
static let pushNotificationVersionParam = "v"
15-
static let pushNotificationVersionValue = "1.1"
10+
import Concurrency
11+
import Http
12+
import Logging
13+
14+
public struct SseClientConstants {
15+
public static let pushNotificationChannelsParam = "channels"
16+
public static let pushNotificationTokenParam = "accessToken"
17+
public static let pushNotificationVersionParam = "v"
18+
public static let pushNotificationVersionValue = "1.1"
1619
}
1720

18-
protocol SseClient: AnyObject {
21+
public protocol SseClient: AnyObject {
1922
typealias CompletionHandler = @Sendable (Bool) -> Void
2023
func connect(token: String, channels: [String], completion: @escaping CompletionHandler)
2124
func disconnect()
2225
var isConnectionOpened: Bool { get }
26+
2327
}
2428

25-
class DefaultSseClient: SseClient, @unchecked Sendable {
29+
public class DefaultSseClient: SseClient, @unchecked Sendable {
2630

2731
///
2832
/// NOTE:
@@ -38,18 +42,18 @@ class DefaultSseClient: SseClient, @unchecked Sendable {
3842
private var isDisconnectCalled: Atomic<Bool> = Atomic(false)
3943
private var isConnected: Atomic<Bool> = Atomic(false)
4044
private var isFirstMessage: Atomic<Bool> = Atomic(false)
41-
var isConnectionOpened: Bool {
45+
public var isConnectionOpened: Bool {
4246
return isConnected.value
4347
}
4448

45-
init(endpoint: Endpoint, httpClient: HttpClient, sseHandler: SseHandler) {
49+
public init(endpoint: Endpoint, httpClient: HttpClient, sseHandler: SseHandler) {
4650
self.endpoint = endpoint
4751
self.httpClient = httpClient
4852
self.sseHandler = sseHandler
4953
self.queue = DispatchQueue(label: "split-sse-client")
5054
}
5155

52-
func connect(token: String, channels: [String], completion: @escaping CompletionHandler) {
56+
public func connect(token: String, channels: [String], completion: @escaping CompletionHandler) {
5357
queue.async(flags: .barrier) { [weak self] in
5458
guard let self = self else { return }
5559
let parameters: [String: Any] = [
@@ -105,7 +109,7 @@ class DefaultSseClient: SseClient, @unchecked Sendable {
105109
}
106110
}
107111

108-
func disconnect() {
112+
public func disconnect() {
109113
Logger.d("Disconnecting SSE client")
110114
isDisconnectCalled.set(true)
111115
isConnected.set(false)
@@ -132,7 +136,7 @@ class DefaultSseClient: SseClient, @unchecked Sendable {
132136

133137
guard let self = self else { return }
134138

135-
let values = self.streamParser.parse(streamChunk: data.stringRepresentation)
139+
let values = self.streamParser.parse(streamChunk: String(data: data, encoding: .utf8) ?? "")
136140

137141
if self.isFirstMessage.value {
138142
if self.isConnectionConfirmed(message: values) {

Split/Network/Streaming/SseClientFactory.swift renamed to Sources/Streaming/SseClientFactory.swift

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,26 @@
77
//
88

99
import Foundation
10+
import Http
1011

11-
protocol SseClientFactory {
12+
public protocol SseClientFactory {
1213
func create() -> SseClient
1314
}
1415

15-
class DefaultSseClientFactory: SseClientFactory {
16+
public class DefaultSseClientFactory: SseClientFactory {
1617
private let endpoint: Endpoint
1718
private let httpClient: HttpClient
1819
private let sseHandler: SseHandler
1920

20-
init(endpoint: Endpoint,
21+
public init(endpoint: Endpoint,
2122
httpClient: HttpClient,
2223
sseHandler: SseHandler) {
2324
self.endpoint = endpoint
2425
self.httpClient = httpClient
2526
self.sseHandler = sseHandler
2627
}
2728

28-
func create() -> SseClient {
29+
public func create() -> SseClient {
2930
DefaultSseClient(endpoint: endpoint,
3031
httpClient: httpClient,
3132
sseHandler: sseHandler)

Split/Network/Streaming/SseConnectionHandler.swift renamed to Sources/Streaming/SseConnectionHandler.swift

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,33 +7,35 @@
77
//
88

99
import Foundation
10+
import Concurrency
11+
import Logging
1012

11-
class SseConnectionHandler: @unchecked Sendable {
13+
public class SseConnectionHandler: @unchecked Sendable {
1214
private let clientLock = NSLock()
1315
private let sseClientFactory: SseClientFactory
1416
private var curClientId: String?
1517
private let clients = SynchronizedDictionary<String, SseClient>()
1618

17-
var isConnectionOpened: Bool {
19+
public var isConnectionOpened: Bool {
1820
guard let id = curClientId else { return false }
1921
return clients.value(forKey: id)?.isConnectionOpened ?? false
2022
}
2123

22-
init(sseClientFactory: SseClientFactory) {
24+
public init(sseClientFactory: SseClientFactory) {
2325
self.sseClientFactory = sseClientFactory
2426
}
2527

26-
func connect(jwt: JwtToken, channels: [String], completion: @escaping SseClient.CompletionHandler) {
28+
public func connect(token: String, channels: [String], completion: @escaping SseClient.CompletionHandler) {
2729
let sseClient = sseClientFactory.create()
2830
addSseClient(sseClient)
29-
sseClient.connect(token: jwt.rawToken, channels: jwt.channels, completion: completion)
31+
sseClient.connect(token: token, channels: channels, completion: completion)
3032
}
3133

32-
func disconnect() {
34+
public func disconnect() {
3335
Logger.d("Streaming Connection Handler - Disconnecting SSE client")
3436
let disconnectingClientId = curClientId
3537
clearClientId()
36-
DispatchQueue.general.async { [weak self] in
38+
DispatchQueue.global().async { [weak self] in
3739
guard let self = self else { return }
3840
guard let clientId = disconnectingClientId else { return }
3941
let cli = self.getSseClient(id: clientId)
@@ -42,7 +44,7 @@ class SseConnectionHandler: @unchecked Sendable {
4244
}
4345
}
4446

45-
func destroy() {
47+
public func destroy() {
4648
for client in clients.takeAll().values {
4749
client.disconnect()
4850
}

Sources/Streaming/SseHandler.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import Foundation
2+
3+
public protocol SseHandler: AnyObject {
4+
func isConnectionConfirmed(message: [String: String]) -> Bool
5+
func handleIncomingMessage(message: [String: String])
6+
func reportError(isRetryable: Bool)
7+
}

Sources/Streaming/Streaming.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
import Foundation
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import XCTest
2+
@testable import Streaming
3+
4+
final class StreamingTests: XCTestCase {
5+
func testExample() {
6+
// Add your tests here
7+
XCTAssertTrue(true)
8+
}
9+
}

0 commit comments

Comments
 (0)