7
7
package com.almasb.fxgl.multiplayer
8
8
9
9
import com.almasb.fxgl.core.EngineService
10
- import com.almasb.fxgl.core.collection.PropertyChangeListener
10
+ import com.almasb.fxgl.core.collection.MovingAverageQueue
11
11
import com.almasb.fxgl.core.collection.PropertyMap
12
12
import com.almasb.fxgl.core.collection.PropertyMapChangeListener
13
13
import com.almasb.fxgl.core.serialization.Bundle
@@ -18,6 +18,8 @@ import com.almasb.fxgl.event.EventBus
18
18
import com.almasb.fxgl.input.*
19
19
import com.almasb.fxgl.logging.Logger
20
20
import com.almasb.fxgl.net.Connection
21
+ import javafx.beans.property.ReadOnlyDoubleProperty
22
+ import javafx.beans.property.ReadOnlyDoubleWrapper
21
23
22
24
/* *
23
25
* TODO: symmetric remove API, e.g. removeReplicationSender()
@@ -30,18 +32,56 @@ class MultiplayerService : EngineService() {
30
32
31
33
private val replicatedEntitiesMap = hashMapOf<Connection <Bundle >, ConnectionData > ()
32
34
35
+ fun registerConnection (connection : Connection <Bundle >) {
36
+ val data = ConnectionData (connection)
37
+ setUpNewConnection(data)
38
+
39
+ replicatedEntitiesMap[connection] = data
40
+ }
41
+
42
+ private fun setUpNewConnection (data : ConnectionData ) {
43
+ // register event handler for the given connection
44
+ // TODO: how to clean up when the connection dies
45
+ addEventReplicationReceiver(data.connection, data.eventBus)
46
+
47
+ data.eventBus.addEventHandler(ReplicationEvent .PING ) { ping ->
48
+ val timeRecv = System .nanoTime()
49
+ fire(data.connection, PongReplicationEvent (ping.timeSent, timeRecv))
50
+ }
51
+
52
+ data.eventBus.addEventHandler(ReplicationEvent .PONG ) { pong ->
53
+ val timeNow = System .nanoTime()
54
+ val roundTripTime = timeNow - pong.timeSent
55
+
56
+ data.pingBuffer.put(roundTripTime.toDouble())
57
+ data.ping.value = data.pingBuffer.average
58
+ }
59
+ }
60
+
33
61
override fun onGameUpdate (tpf : Double ) {
34
62
if (replicatedEntitiesMap.isEmpty())
35
63
return
36
64
65
+ val now = System .nanoTime()
66
+
37
67
// TODO: can (should) we move this to NetworkComponent to act on a per entity basis ...
38
68
replicatedEntitiesMap.forEach { conn, data ->
69
+ fire(conn, PingReplicationEvent (now))
70
+
39
71
if (data.entities.isNotEmpty()) {
40
72
updateReplicatedEntities(conn, data.entities)
41
73
}
42
74
}
43
75
}
44
76
77
+ /* *
78
+ * @return round-trip time from this endpoint to given [connection]
79
+ */
80
+ fun pingProperty (connection : Connection <Bundle >): ReadOnlyDoubleProperty {
81
+ // TODO: if no connection in map
82
+ return replicatedEntitiesMap[connection]!! .ping.readOnlyProperty
83
+ }
84
+
45
85
private fun updateReplicatedEntities (connection : Connection <Bundle >, entities : MutableList <Entity >) {
46
86
val events = arrayListOf<ReplicationEvent >()
47
87
@@ -73,9 +113,9 @@ class MultiplayerService : EngineService() {
73
113
74
114
val event = EntitySpawnEvent (networkComponent.id, entityName, entity.x, entity.y, entity.z)
75
115
76
- val data = replicatedEntitiesMap.getOrDefault(connection, ConnectionData ())
116
+ // TODO: if not available
117
+ val data = replicatedEntitiesMap[connection]!!
77
118
data.entities + = entity
78
- replicatedEntitiesMap[connection] = data
79
119
80
120
fire(connection, event)
81
121
}
@@ -237,7 +277,11 @@ class MultiplayerService : EngineService() {
237
277
}
238
278
}
239
279
240
- private class ConnectionData {
280
+ private class ConnectionData ( val connection : Connection < Bundle >) {
241
281
val entities = ArrayList <Entity >()
282
+ val eventBus = EventBus ().also { it.isLoggingEnabled = false }
283
+
284
+ val pingBuffer = MovingAverageQueue (1000 )
285
+ val ping = ReadOnlyDoubleWrapper ()
242
286
}
243
287
}
0 commit comments