This commit is contained in:
tony_all 2023-04-16 17:16:38 +08:00
parent 31109849fe
commit fdaa4a57eb
13 changed files with 100 additions and 16 deletions

View File

@ -8,6 +8,7 @@ import cc.maxmc.msm.child.command.Send
import cc.maxmc.msm.child.listener.ProtocolListener
import cc.maxmc.msm.child.netty.NetClient
import cc.maxmc.msm.child.settings.Settings
import cc.maxmc.msm.child.settings.SettingsReader
import cc.maxmc.msm.common.network.netty.NetworkRegistry
import net.md_5.bungee.api.ProxyServer
import net.md_5.bungee.api.plugin.Plugin
@ -18,13 +19,14 @@ class MultiServerMan : Plugin() {
}
override fun onEnable() {
MultiServerManAPIProvider.register(APIImpl)
ProxyServer.getInstance().pluginManager.registerCommand(this, Send)
ProxyServer.getInstance().pluginManager.registerCommand(this, Api)
ProxyServer.getInstance().pluginManager.registerListener(this, APIPacketListener)
ProxyServer.getInstance().pluginManager.registerListener(this, ProtocolListener)
SettingsReader
NetworkRegistry
NetClient.init(Settings.Parent.address, Settings.Parent.port)
MultiServerManAPIProvider.register(APIImpl)
}
override fun onDisable() {

View File

@ -6,10 +6,7 @@ import cc.maxmc.msm.child.netty.NetClient
import cc.maxmc.msm.child.settings.Settings
import cc.maxmc.msm.common.event.ChannelInactiveEvent
import cc.maxmc.msm.common.event.PacketReceiveEvent
import cc.maxmc.msm.common.network.packet.CPacketGetInfo
import cc.maxmc.msm.common.network.packet.CPacketRequestServer
import cc.maxmc.msm.common.network.packet.PPacketChildInfo
import cc.maxmc.msm.common.network.packet.PPacketServerStarted
import cc.maxmc.msm.common.network.packet.*
import cc.maxmc.msm.common.utils.log
import cc.maxmc.msm.common.utils.pluginScope
import kotlinx.coroutines.launch
@ -21,7 +18,7 @@ import kotlin.io.path.listDirectoryEntries
import kotlin.io.path.name
object ProtocolListener : Listener {
val serverCache = HashMap<UUID, SubServer>()
private val serverCache = HashMap<UUID, SubServer>()
@EventHandler
fun onGetInfo(packetE: PacketReceiveEvent) {
@ -36,7 +33,9 @@ object ProtocolListener : Listener {
fun onRequestServer(packetE: PacketReceiveEvent) {
val packet = packetE.packet as? CPacketRequestServer ?: return
val serverInfo = packet.serverInfo
log("§a| §7正在启动服务器 ${serverInfo.server}")
val subServer = SubServer(serverInfo.uid, packet.type, serverInfo.server!!.split(":")[1].toInt())
serverCache[serverInfo.uid] = subServer
pluginScope.launch {
subServer.initServer()
subServer.startServer()
@ -44,10 +43,16 @@ object ProtocolListener : Listener {
}
}
@EventHandler
fun onDisconnect(packet: ChannelInactiveEvent) {
repeat(10) {
log("Remote Disconnected.")
}
}
@EventHandler
fun onServerEnd(evt: PacketReceiveEvent) {
val packet = evt.packet as? CPacketEndServer ?: return
packet.server
}
}

View File

@ -12,13 +12,14 @@ import kotlin.io.path.copyToRecursively
import kotlin.io.path.createDirectories
import kotlin.io.path.deleteRecursively
@Suppress("CanBeParameter", "MemberVisibilityCanBePrivate") // API
@OptIn(kotlin.io.path.ExperimentalPathApi::class)
class SubServer(
val uid: UUID, val type: String, val port: Int, baseFolder: Path = MultiServerMan.instance.dataFolder.toPath()
) {
private val serverFolder =
baseFolder.resolve("cache").resolve(uid.toString().substring(0..7)).also { it.createDirectories() }
private val patternFolder = baseFolder.resolve("pattern").resolve(type).also { it.createDirectories() }
private val patternFolder = baseFolder.resolve("patterns").resolve(type).also { it.createDirectories() }
private val runner = ScriptRunner(serverFolder.resolve("start.sh"))
suspend fun initServer() = withContext(Dispatchers.IO) {

View File

@ -4,14 +4,15 @@ import cc.maxmc.msm.common.network.BungeePacket
import cc.maxmc.msm.common.network.netty.NetworkRegistry
import cc.maxmc.msm.common.utils.log
import cc.maxmc.msm.common.utils.pipelineInit
import cc.maxmc.msm.common.utils.pluginScope
import io.netty.bootstrap.Bootstrap
import io.netty.channel.Channel
import io.netty.channel.ChannelFutureListener
import io.netty.channel.ChannelOption
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioSocketChannel
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
object NetClient {
private val loop = NioEventLoopGroup()

View File

@ -11,8 +11,12 @@ import kotlin.io.path.*
object SettingsReader {
private val file = MultiServerMan.instance.dataFolder.toPath().resolve("settings.yml")
val config: Configuration
private val basePatternFolder = MultiServerMan.instance.dataFolder.toPath().resolve("patterns")
init {
if (!basePatternFolder.exists()) {
basePatternFolder.createDirectories()
}
if (!file.exists()) {
MultiServerMan.instance.dataFolder.toPath().createDirectories()
val stream = MultiServerMan.instance.getResourceAsStream("settings.yml")

View File

@ -1,5 +1,6 @@
package cc.maxmc.msm.child.utils
import cc.maxmc.msm.common.utils.log
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import java.nio.file.Path
@ -23,6 +24,7 @@ class ScriptRunner(private val scriptPath: Path) {
builder.command("powershell.exe", scriptPath.name, *args)
}
process = builder.start()
log("§c§lProcess Started")
thread = thread {
while (true) {
if (listeners.isEmpty()) {
@ -30,7 +32,6 @@ class ScriptRunner(private val scriptPath: Path) {
}
var line = ""
while (reader.readLine()?.also { line = it } != null) {
println(line)
listeners.forEach {
if (line.contains(it.first)) it.second()
}

View File

@ -23,6 +23,7 @@ object NetworkRegistry {
registerPacket(PacketDirection.CHILD_BOUND, CPacketAPICallback.CPacketCallbackGetServer::class.java)
registerPacket(PacketDirection.CHILD_BOUND, CPacketRequestServer::class.java)
registerPacket(PacketDirection.CHILD_BOUND, CPacketGetInfo::class.java)
registerPacket(PacketDirection.CHILD_BOUND, CPacketEndServer::class.java)
}

View File

@ -0,0 +1,19 @@
package cc.maxmc.msm.common.network.packet
import cc.maxmc.msm.api.misc.ServerInfo
import cc.maxmc.msm.common.network.BungeePacket
import cc.maxmc.msm.common.utils.readServerInfo
import cc.maxmc.msm.common.utils.writeServerInfo
import io.netty.buffer.ByteBuf
class CPacketEndServer(
var server: ServerInfo
) : BungeePacket() {
override fun encode(buf: ByteBuf) {
buf.writeServerInfo(server)
}
override fun decode(buf: ByteBuf) {
server = buf.readServerInfo()
}
}

View File

@ -20,6 +20,7 @@ dependencies {
}
tasks.shadowJar {
archiveClassifier.set(null as? String?)
relocate("kotlin", "cc.maxmc.msm.lib.kotlin")
}

View File

@ -7,6 +7,7 @@ import cc.maxmc.msm.parent.manager.MatchManager
import cc.maxmc.msm.parent.manager.ServerManager
import cc.maxmc.msm.parent.netty.NetManager
import net.md_5.bungee.api.ProxyServer
import net.md_5.bungee.api.connection.Server
import net.md_5.bungee.api.plugin.Plugin
class MultiServerMan : Plugin() {
@ -21,6 +22,7 @@ class MultiServerMan : Plugin() {
}
override fun onDisable() {
ServerManager.end()
NetManager.shutdownServer()
}

View File

@ -22,6 +22,8 @@ object ChildManager {
}
val child = ChildBungee(channel, packet.portRange, packet.types)
children.add(child)
ServerManager.initChild(child)
log("§a| §7成功将 ${channel.remoteAddress()} 注册到集群!")
}
}
@ -30,6 +32,10 @@ object ChildManager {
}
fun requestChild(type: String): ChildBungee {
children.forEach {
val ports = it.getAvailablePorts()
log("${ports.size} ports: $ports")
}
return children.filter { it.getAvailablePorts().isNotEmpty() && it.types.contains(type) }
.maxByOrNull { it.getAvailablePorts().size } ?: throw IllegalStateException("当前无可用端口开启新服务器.")
}

View File

@ -1,14 +1,50 @@
package cc.maxmc.msm.parent.manager
import cc.maxmc.msm.api.misc.ServerInfo
import cc.maxmc.msm.common.network.packet.CPacketEndServer
import cc.maxmc.msm.common.utils.log
import cc.maxmc.msm.common.utils.pluginScope
import cc.maxmc.msm.parent.misc.ChildBungee
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import java.util.*
import java.util.concurrent.ConcurrentHashMap
object ServerManager {
private val serverMap = ConcurrentHashMap<String, MutableList<ServerInfo>>()
private val childMap = ConcurrentHashMap<ServerInfo, ChildBungee>()
private val serverChannel = Channel<suspend () -> Unit>()
init {
pluginScope.launch {
for (func in serverChannel) {
log("execute one")
func.invoke()
}
}
}
fun end() {
serverChannel.close()
}
suspend fun initChild(child: ChildBungee) {
serverChannel.send {
log("§b| §7正在初始化 ${child.channel.remoteAddress()}")
child.types.forEach {
if (!serverMap.containsKey(it)) {
serverMap[it] = ArrayList()
}
if (serverMap[it]!!.size <= 2) {
pluginScope.launch {
repeat(2) { _ ->
requestServer(it)
}
}
}
}
}
}
fun consumeServer(type: String): ServerInfo {
val servers = getAvailableServers(type)
@ -17,7 +53,7 @@ object ServerManager {
if (servers.size - 1 <= 2) {
pluginScope.launch {
log("§b| §7剩余 $type 服务器不足,正在启动新服务端。")
val server = requireServer(type)
val server = requestServer(type)
log("§b| §7类型 $type 服务器启动成功: ${server.server}")
}
}
@ -30,6 +66,8 @@ object ServerManager {
it.find { server -> server.uid == uid }?.let { server -> result = server; true } ?: false
} ?: throw IllegalStateException("Illegal state.")
list.remove(result)
val child = childMap.remove(result)!!
result?.let { child.sendPacket(CPacketEndServer(it)) }
}
fun getServerById(uid: UUID): ServerInfo? {
@ -40,11 +78,12 @@ object ServerManager {
return serverMap[type]!!.filter { it.isAvailable }
}
private suspend fun requireServer(type: String): ServerInfo {
private suspend fun requestServer(type: String): ServerInfo {
val child = ChildManager.requestChild(type)
val list = serverMap.getOrPut(type) { ArrayList() }
val list = serverMap[type]!!
val server = child.requestServer(type)
list.add(server)
childMap[server] = child
return server
}
}

View File

@ -12,21 +12,23 @@ import java.util.*
class ChildBungee(
val channel: Channel, val ports: Set<Int>, val types: Set<String>, val usedPorts: MutableSet<Int> = HashSet()
) {
private fun sendPacket(packet: BungeePacket) {
fun sendPacket(packet: BungeePacket) {
channel.writeAndFlush(packet)
}
suspend fun requestServer(type: String): ServerInfo {
val uid = UUID.randomUUID()
val port = getAvailablePorts().first()
val server = ServerInfo(
uid,
"${(channel.remoteAddress() as InetSocketAddress).hostString}:${getAvailablePorts().first()}"
"${(channel.remoteAddress() as InetSocketAddress).hostString}:$port"
)
val cPacket = CPacketRequestServer(type, server)
sendPacket(cPacket)
val packet = awaitPacket(PPacketServerStarted::class.java) { ch, packet ->
ch == channel && packet.serverInfo == server
}
usedPorts += port
return packet.serverInfo
}