This commit is contained in:
TONY_All 2023-04-05 20:13:19 +08:00
parent e1459c3e8e
commit 5681164469
19 changed files with 301 additions and 9 deletions

View File

@ -8,6 +8,15 @@ public class ServerInfo {
@Nullable @Nullable
private final HostAndPort server; private final HostAndPort server;
private final int id; private final int id;
private boolean available;
public boolean isAvailable() {
return available;
}
public void setAvailable(boolean available) {
this.available = available;
}
public ServerInfo() { public ServerInfo() {
server = HostAndPort.fromString("127.0.0.1"); server = HostAndPort.fromString("127.0.0.1");

View File

@ -0,0 +1,10 @@
import cc.maxmc.msm.child.misc.SubServer
import kotlin.io.path.Path
suspend fun main() {
val server = SubServer(10086, "test", 25571, Path("/Users/tony_all/Servers/MSM/"))
server.initServer()
server.startServer()
Thread.sleep(10000)
server.cleanup()
}

View File

@ -0,0 +1,40 @@
package cc.maxmc.msm.child.misc
import cc.maxmc.msm.child.MultiServerMan
import cc.maxmc.msm.child.utils.ScriptRunner
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import java.nio.file.Path
import kotlin.io.path.copyToRecursively
import kotlin.io.path.createDirectories
import kotlin.io.path.deleteRecursively
@OptIn(kotlin.io.path.ExperimentalPathApi::class)
class SubServer(
val id: Int,
val type: String,
val port: Int,
baseFolder: Path = MultiServerMan.instance.dataFolder.toPath()
) {
private val serverFolder = baseFolder.resolve("cache").resolve(id.toString()).also { it.createDirectories() }
private val patternFolder = baseFolder.resolve("pattern").resolve(type).also { it.createDirectories() }
private val runner = ScriptRunner(serverFolder.resolve("start.sh"))
suspend fun initServer() = withContext(Dispatchers.IO) {
patternFolder.copyToRecursively(serverFolder, followLinks = false, overwrite = true)
}
fun startServer() {
runner.launch(port.toString())
runner.onOutput("For help, type \"help\" or \"?\"") {
println("Started.")
}
}
suspend fun cleanup() {
runner.exec("stop")
println("Exec stop")
runner.exit()
serverFolder.deleteRecursively()
}
}

View File

@ -0,0 +1,54 @@
package cc.maxmc.msm.child.utils
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import java.nio.file.Path
import java.util.concurrent.CopyOnWriteArrayList
import kotlin.concurrent.thread
import kotlin.io.path.extension
import kotlin.io.path.name
class ScriptRunner(private val scriptPath: Path) {
private lateinit var process: Process
lateinit var thread: Thread
private val listeners = CopyOnWriteArrayList<Pair<String, () -> Unit>>()
private val writer by lazy { process.outputStream.bufferedWriter() }
private val reader by lazy { process.inputStream.bufferedReader() }
fun launch(vararg args: String) {
val builder = ProcessBuilder().directory(scriptPath.parent.toFile())
if (scriptPath.extension == "sh") {
builder.command("bash", scriptPath.name, *args)
} else {
builder.command("powershell.exe", scriptPath.name, *args)
}
process = builder.start()
thread = thread {
while (true) {
if (listeners.isEmpty()) {
continue
}
var line = ""
while (reader.readLine()?.also { line = it } != null) {
println(line)
listeners.forEach {
if (line.contains(it.first)) it.second()
}
}
}
}
}
fun exec(command: String) {
writer.appendLine(command)
writer.flush()
}
fun onOutput(string: String, func: () -> Unit) {
listeners.add(string to func)
}
suspend fun exit(): Int = withContext(Dispatchers.IO) {
process.waitFor()
}
}

View File

@ -4,4 +4,4 @@ parent:
ports: ports:
- 30000 - 30000
- 30001..30019 - 30001..30019
- - 30443

View File

@ -13,6 +13,7 @@ repositories {
dependencies { dependencies {
api(project(":api")) api(project(":api"))
implementation(kotlin("stdlib")) implementation(kotlin("stdlib"))
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.0-Beta")
@Suppress("VulnerableLibrariesLocal") @Suppress("VulnerableLibrariesLocal")
compileOnly("io.github.waterfallmc:waterfall-api:1.19-R0.1-SNAPSHOT") compileOnly("io.github.waterfallmc:waterfall-api:1.19-R0.1-SNAPSHOT")
compileOnly("net.md-5:bungeecord-proxy:1.19-R0.1-SNAPSHOT") { compileOnly("net.md-5:bungeecord-proxy:1.19-R0.1-SNAPSHOT") {

View File

@ -0,0 +1,6 @@
package cc.maxmc.msm.common.event
import io.netty.channel.Channel
import net.md_5.bungee.api.plugin.Event
class ChannelActiveEvent(val channel: Channel): Event()

View File

@ -0,0 +1,6 @@
package cc.maxmc.msm.common.event
import io.netty.channel.Channel
import net.md_5.bungee.api.plugin.Event
class ChannelInactiveEvent(val channel: Channel): Event()

View File

@ -1,7 +1,9 @@
package cc.maxmc.msm.common.network package cc.maxmc.msm.common.network
import cc.maxmc.msm.common.event.ChannelActiveEvent
import cc.maxmc.msm.common.event.ChannelInactiveEvent
import cc.maxmc.msm.common.event.PacketReceiveEvent import cc.maxmc.msm.common.event.PacketReceiveEvent
import cc.maxmc.msm.common.utils.debug import cc.maxmc.msm.common.utils.awaiting
import io.netty.channel.ChannelHandler.Sharable import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler import io.netty.channel.SimpleChannelInboundHandler
@ -9,7 +11,16 @@ import io.netty.channel.SimpleChannelInboundHandler
@Sharable @Sharable
object ClusterPacketHandler : SimpleChannelInboundHandler<BungeePacket>() { object ClusterPacketHandler : SimpleChannelInboundHandler<BungeePacket>() {
override fun channelRead0(ctx: ChannelHandlerContext, msg: BungeePacket) { override fun channelRead0(ctx: ChannelHandlerContext, msg: BungeePacket) {
// debug("call event") awaiting.filter { it.first == msg::class.java }
.
PacketReceiveEvent(ctx.channel(), msg).callEvent() PacketReceiveEvent(ctx.channel(), msg).callEvent()
} }
override fun channelActive(ctx: ChannelHandlerContext) {
ChannelActiveEvent(ctx.channel()).callEvent()
}
override fun channelInactive(ctx: ChannelHandlerContext) {
ChannelInactiveEvent(ctx.channel()).callEvent()
}
} }

View File

@ -0,0 +1,11 @@
package cc.maxmc.msm.common.network.packet
import cc.maxmc.msm.common.network.BungeePacket
import io.netty.buffer.ByteBuf
class CPacketGetInfo : BungeePacket() {
override fun encode(buf: ByteBuf) {}
override fun decode(buf: ByteBuf) {}
}

View File

@ -0,0 +1,21 @@
package cc.maxmc.msm.common.network.packet
import cc.maxmc.msm.common.network.BungeePacket
import io.netty.buffer.ByteBuf
import net.md_5.bungee.protocol.DefinedPacket
import java.util.*
class CPacketRequestServer(
var type: String,
var uid: UUID = UUID.randomUUID()
) : BungeePacket() {
override fun encode(buf: ByteBuf) {
DefinedPacket.writeUUID(uid, buf)
DefinedPacket.writeString(type, buf)
}
override fun decode(buf: ByteBuf) {
uid = DefinedPacket.readUUID(buf)
type = DefinedPacket.readString(buf)
}
}

View File

@ -0,0 +1,29 @@
package cc.maxmc.msm.common.network.packet
import cc.maxmc.msm.common.network.BungeePacket
import io.netty.buffer.ByteBuf
import net.md_5.bungee.protocol.DefinedPacket
class PPacketChildInfo(var portRange: MutableSet<Int> = HashSet(), var types: MutableSet<String>) : BungeePacket() {
override fun encode(buf: ByteBuf) {
DefinedPacket.writeVarInt(portRange.size, buf)
portRange.forEach {
DefinedPacket.writeVarInt(it, buf)
}
DefinedPacket.writeVarInt(types.size, buf)
types.forEach {
DefinedPacket.writeString(it, buf)
}
}
override fun decode(buf: ByteBuf) {
val size = DefinedPacket.readVarInt(buf)
repeat(size) {
portRange.add(DefinedPacket.readVarInt(buf))
}
val typeSize = DefinedPacket.readVarInt(buf)
repeat(typeSize) {
types.add(DefinedPacket.readString(buf))
}
}
}

View File

@ -0,0 +1,22 @@
package cc.maxmc.msm.common.utils
import cc.maxmc.msm.common.network.BungeePacket
import io.netty.channel.Channel
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlin.coroutines.Continuation
import kotlin.coroutines.suspendCoroutine
val pluginScope = CoroutineScope(SupervisorJob() + CoroutineExceptionHandler { _, except ->
log("§c执行异步操作时出现异常 ${except.message}")
except.printStackTrace()
})
val awaiting = ArrayList<Triple<Class<out BungeePacket>, (BungeePacket) -> Boolean, Continuation<BungeePacket>>>()
suspend inline fun <T : BungeePacket> awaitPacket(packetClass: Class<T>, noinline filter: (Channel, T) -> Boolean) =
suspendCoroutine<T> {
val triple = Triple(packetClass, filter, it)
@Suppress("UNCHECKED_CAST")
awaiting.add(triple as Triple<Class<out BungeePacket>, (BungeePacket) -> Boolean, Continuation<BungeePacket>>)
}

View File

@ -14,6 +14,7 @@ dependencies {
implementation(kotlin("stdlib")) implementation(kotlin("stdlib"))
implementation(project(":common")) implementation(project(":common"))
implementation("com.zaxxer:HikariCP:4.0.3") implementation("com.zaxxer:HikariCP:4.0.3")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.0-Beta")
@Suppress("VulnerableLibrariesLocal") @Suppress("VulnerableLibrariesLocal")
compileOnly("io.github.waterfallmc:waterfall-api:1.19-R0.1-SNAPSHOT") compileOnly("io.github.waterfallmc:waterfall-api:1.19-R0.1-SNAPSHOT")
} }

View File

@ -0,0 +1,9 @@
package cc.maxmc.msm.parent
import sun.misc.Signal
import java.lang.management.ManagementFactory
import kotlin.system.exitProcess
fun main() {
}

View File

@ -1,13 +1,15 @@
package cc.maxmc.msm.parent.listener package cc.maxmc.msm.parent.listener
import cc.maxmc.msm.api.MultiServerManAPIProvider import cc.maxmc.msm.api.MultiServerManAPIProvider
import cc.maxmc.msm.common.event.ChannelActiveEvent
import cc.maxmc.msm.common.event.ChannelInactiveEvent
import cc.maxmc.msm.common.event.PacketReceiveEvent import cc.maxmc.msm.common.event.PacketReceiveEvent
import cc.maxmc.msm.common.network.packet.CPacketAPICallback import cc.maxmc.msm.common.network.packet.CPacketAPICallback
import cc.maxmc.msm.common.network.packet.CPacketDebug import cc.maxmc.msm.common.network.packet.CPacketDebug
import cc.maxmc.msm.common.network.packet.PPacketAPICall import cc.maxmc.msm.common.network.packet.PPacketAPICall
import cc.maxmc.msm.common.network.packet.PPacketDebug import cc.maxmc.msm.common.network.packet.PPacketDebug
import cc.maxmc.msm.common.utils.debug
import cc.maxmc.msm.common.utils.log import cc.maxmc.msm.common.utils.log
import cc.maxmc.msm.parent.manager.ChildManager
import net.md_5.bungee.api.plugin.Listener import net.md_5.bungee.api.plugin.Listener
import net.md_5.bungee.event.EventHandler import net.md_5.bungee.event.EventHandler
@ -48,4 +50,13 @@ object PacketListener : Listener {
log("§fDEBUG | §7收到: \"${packet.content}\"") log("§fDEBUG | §7收到: \"${packet.content}\"")
evt.channel.writeAndFlush(CPacketDebug("(${evt.channel.localAddress()}) - ${packet.content}")) evt.channel.writeAndFlush(CPacketDebug("(${evt.channel.localAddress()}) - ${packet.content}"))
} }
@EventHandler
fun onChannelActive(evt: ChannelActiveEvent) {
ChildManager.registerChild(evt.channel)
}
fun onChannelInactive(evt: ChannelInactiveEvent) {
ChildManager.unregisterChild(evt.channel)
}
} }

View File

@ -1,11 +1,36 @@
package cc.maxmc.msm.parent.manager package cc.maxmc.msm.parent.manager
import cc.maxmc.msm.common.network.packet.CPacketGetInfo
import cc.maxmc.msm.common.network.packet.PPacketChildInfo
import cc.maxmc.msm.common.utils.awaitPacket
import cc.maxmc.msm.common.utils.log
import cc.maxmc.msm.common.utils.pluginScope
import cc.maxmc.msm.parent.misc.ChildBungee import cc.maxmc.msm.parent.misc.ChildBungee
import io.netty.channel.Channel
import kotlinx.coroutines.launch
import java.util.concurrent.CopyOnWriteArrayList
object ChildManager { object ChildManager {
val children = ArrayList<ChildBungee>() private val children = CopyOnWriteArrayList<ChildBungee>()
fun registerChild(child: ChildBungee) { fun registerChild(channel: Channel) {
children.add(child) pluginScope.launch {
log("§b| §7正在将 ${channel.remoteAddress()} 注册到集群.")
channel.writeAndFlush(CPacketGetInfo())
val packet = awaitPacket(PPacketChildInfo::class.java) { ch, _ ->
ch == channel
}
val child = ChildBungee(channel, packet.portRange)
}
}
fun unregisterChild(channel: Channel) {
children.removeIf { it.channel == channel }
}
fun requestChild(): ChildBungee {
return children.filter { it.getAvailablePorts().isNotEmpty() }.maxByOrNull { it.getAvailablePorts().size }
?: throw IllegalStateException("当前无可用端口开启新服务器.")
} }
} }

View File

@ -0,0 +1,17 @@
package cc.maxmc.msm.parent.manager
import cc.maxmc.msm.api.misc.ServerInfo
import java.util.concurrent.ConcurrentHashMap
object ServerManager {
val serverMap = ConcurrentHashMap<String, List<ServerInfo>>()
fun getAvailableServers(type: String): List<ServerInfo> {
return serverMap[type]!!.filter { it.isAvailable }
}
private fun requireServer(type: String) {
val child = ChildManager.requestChild()
}
}

View File

@ -1,10 +1,19 @@
package cc.maxmc.msm.parent.misc package cc.maxmc.msm.parent.misc
import cc.maxmc.msm.common.network.BungeePacket import cc.maxmc.msm.common.network.BungeePacket
import cc.maxmc.msm.common.network.packet.CPacketRequestServer
import cc.maxmc.msm.common.utils.awaitPacket
import io.netty.channel.Channel import io.netty.channel.Channel
class ChildBungee(val channel: Channel, var ports: List<Int>, var usedPorts: List<Int> = ArrayList()) { class ChildBungee(val channel: Channel, var ports: Set<Int>, var usedPorts: MutableSet<Int> = HashSet()) {
fun sendPacket(packet: BungeePacket) { private fun sendPacket(packet: BungeePacket) {
channel.writeAndFlush(packet) channel.writeAndFlush(packet)
} }
suspend fun requestServer(type: String) {
sendPacket(CPacketRequestServer(type))
awaitPacket()
}
fun getAvailablePorts() = ports - usedPorts
} }