Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
logs
target
metals.sbt
.metals/
.vscode/
.bsp/
.bloop/
/.idea
/.idea_modules
/.classpath
Expand Down
74 changes: 54 additions & 20 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import Dependencies.AkkaVersion
import Dependencies.Scala212
import Dependencies.Scala213
import play.core.PlayVersion.{ current => playVersion }
import Dependencies.Scala3
import Dependencies.PlayVersion
import Dependencies.PlayVersion212

lazy val runChromeWebDriver = taskKey[Unit]("Run the chromewebdriver tests")

Expand All @@ -12,6 +14,38 @@ val akkaCluster = Seq(
"com.typesafe.akka" %% "akka-cluster-tools" % AkkaVersion
)

val play212Dependencies = Seq(
"com.typesafe.play" %% "play" % PlayVersion212,
// Test dependencies for running a Play server
"com.typesafe.play" %% "play-akka-http-server" % PlayVersion212 % Test,
"com.typesafe.play" %% "play-logback" % PlayVersion212 % Test,
// Test dependencies for Scala/Java dependency injection
"com.typesafe.play" %% "play-guice" % PlayVersion212 % Test,
)

val playDependencies = Seq(
// Production dependencies
"com.typesafe.play" %% "play" % PlayVersion,
// Test dependencies for running a Play server
"com.typesafe.play" %% "play-akka-http-server" % PlayVersion % Test,
"com.typesafe.play" %% "play-logback" % PlayVersion % Test,
// Test dependencies for Scala/Java dependency injection
"com.typesafe.play" %% "play-guice" % PlayVersion % Test,
)

val commonDependencies = Seq(
// Production dependencies
"com.typesafe.akka" %% "akka-remote" % AkkaVersion,
// Test dependencies for Scala/Java dependency injection
macwire % Test,
// Test dependencies for running chrome driver
"io.github.bonigarcia" % "webdrivermanager" % "5.3.2" % Test,
"org.seleniumhq.selenium" % "selenium-chrome-driver" % "4.9.1" % Test,
// Test framework dependencies
"org.scalatest" %% "scalatest" % "3.2.16" % Test,
"com.novocode" % "junit-interface" % "0.11" % Test
)

// Customise sbt-dynver's behaviour to make it work with tags which aren't v-prefixed
(ThisBuild / dynverVTagPrefix) := false

Expand All @@ -21,27 +55,21 @@ lazy val root = (project in file("."))
name := "play-socket-io",
mimaPreviousArtifacts := Set.empty, // TODO: enable after first release
scalaVersion := Scala213,
crossScalaVersions := Seq(Scala213, Scala212),
scalacOptions ++= Seq("-feature", "-release", "11"),
crossScalaVersions := Seq(Scala213, Scala212, Scala3),
scalacOptions ++= {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((3, n)) => Seq("-feature", "-release", "11")
case _ => Seq("-feature", "-release", "11", "-Xsource:3")
}
},
(Compile / doc / scalacOptions) := Nil,
javacOptions ++= Seq("-Xlint"),
libraryDependencies ++= Seq(
// Production dependencies
"com.typesafe.play" %% "play" % playVersion,
"com.typesafe.akka" %% "akka-remote" % AkkaVersion,
// Test dependencies for running a Play server
"com.typesafe.play" %% "play-akka-http-server" % playVersion % Test,
"com.typesafe.play" %% "play-logback" % playVersion % Test,
// Test dependencies for Scala/Java dependency injection
"com.typesafe.play" %% "play-guice" % playVersion % Test,
macwire % Test,
// Test dependencies for running chrome driver
"io.github.bonigarcia" % "webdrivermanager" % "5.3.2" % Test,
"org.seleniumhq.selenium" % "selenium-chrome-driver" % "4.9.1" % Test,
// Test framework dependencies
"org.scalatest" %% "scalatest" % "3.1.2" % Test,
"com.novocode" % "junit-interface" % "0.11" % Test
),
libraryDependencies ++= {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, 12)) => commonDependencies ++ play212Dependencies
case _ => commonDependencies ++ playDependencies
}
},
(Compile / PB.targets) := Seq(
scalapb.gen() -> (Compile / sourceManaged).value
),
Expand Down Expand Up @@ -76,6 +104,7 @@ lazy val scalaChat = (project in file("samples/scala/chat"))
name := "play-socket.io-scala-chat-example",
organization := "com.typesafe.play",
scalaVersion := Scala213,
crossScalaVersions := Seq(Scala213, Scala212, Scala3),
libraryDependencies += macwire % Provided
)

Expand All @@ -86,6 +115,7 @@ lazy val scalaMultiRoomChat = (project in file("samples/scala/multi-room-chat"))
name := "play-socket.io-scala-multi-room-chat-example",
organization := "com.typesafe.play",
scalaVersion := Scala213,
crossScalaVersions := Seq(Scala213, Scala212, Scala3),
libraryDependencies += macwire % Provided
)

Expand All @@ -96,6 +126,7 @@ lazy val scalaClusteredChat = (project in file("samples/scala/clustered-chat"))
name := "play-socket.io-scala-clustered-chat-example",
organization := "com.typesafe.play",
scalaVersion := Scala213,
crossScalaVersions := Seq(Scala213, Scala212, Scala3),
libraryDependencies ++= Seq(macwire % Provided) ++ akkaCluster
)

Expand All @@ -106,6 +137,7 @@ lazy val javaChat = (project in file("samples/java/chat"))
name := "play-socket.io-java-chat-example",
organization := "com.typesafe.play",
scalaVersion := Scala213,
crossScalaVersions := Seq(Scala213, Scala212, Scala3),
libraryDependencies += guice
)

Expand All @@ -116,6 +148,7 @@ lazy val javaMultiRoomChat = (project in file("samples/java/multi-room-chat"))
name := "play-socket.io-java-multi-room-chat-example",
organization := "com.typesafe.play",
scalaVersion := Scala213,
crossScalaVersions := Seq(Scala213, Scala212, Scala3),
libraryDependencies ++= Seq(guice, lombok)
)

Expand All @@ -126,6 +159,7 @@ lazy val javaClusteredChat = (project in file("samples/java/clustered-chat"))
name := "play-socket.io-java-clustered-chat-example",
organization := "com.typesafe.play",
scalaVersion := Scala213,
crossScalaVersions := Seq(Scala213, Scala212, Scala3),
libraryDependencies ++= Seq(guice, lombok) ++ akkaCluster
)

Expand Down
10 changes: 6 additions & 4 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
object Dependencies {
// Sync with GA (.github/workflows/build-test.yml)
val Scala212 = "2.12.17" // sync! see comment above
val Scala213 = "2.13.10" // sync! see comment above

val AkkaVersion = "2.5.32"
val Scala212 = "2.12.18" // sync! see comment above
val Scala213 = "2.13.11" // sync! see comment above
val Scala3 = "3.3.0-RC6"
val AkkaVersion = "2.6.20"
val PlayVersion = "2.9.0-M4"
val PlayVersion212 = "2.8.9"
}
6 changes: 4 additions & 2 deletions src/main/scala/play/engineio/EngineIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ import play.api.inject.Module
import play.api.mvc._
import play.engineio.EngineIOManagerActor._
import play.engineio.protocol._
import play.engineio.protocol.EngineIOTransport

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
Expand Down Expand Up @@ -86,8 +88,8 @@ final class EngineIOController(
)(implicit ec: ExecutionContext)
extends AbstractController(controllerComponents) {

private val log = Logger(classOf[EngineIOController])
private implicit val timeout = Timeout(config.pingTimeout)
private val log = Logger(classOf[EngineIOController])
private implicit val timeout: Timeout = Timeout(config.pingTimeout)

/**
* The endpoint to route to from a router.
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/play/engineio/EngineIOManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class EngineIOManagerActor(config: EngineIOConfig, sessionProps: Props) extends
case Some(sessionActor) =>
sessionActor.tell(close, sender())
case None =>
sender ! Done
sender() ! Done
}

case message: SessionMessage =>
Expand Down
37 changes: 19 additions & 18 deletions src/main/scala/play/engineio/EngineIOSessionActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import akka.actor.ActorRef
import akka.actor.DeadLetterSuppression
import akka.actor.Props
import akka.actor.Status
import akka.util.Timeout
import akka.pattern.pipe
import akka.stream._
import akka.stream.scaladsl._
Expand Down Expand Up @@ -154,11 +155,11 @@ class EngineIOSessionActor[SessionData](
case Close(_, transport, requestId) =>
debug(requestId, transport, "Requested to close session")
context.stop(self)
sender ! Done
sender() ! Done

case message: SessionMessage =>
debug(message.requestId, message.transport, s"Unknown session id")
sender ! Status.Failure(UnknownSessionId(sid))
sender() ! Status.Failure(UnknownSessionId(sid))
context.stop(self)
}

Expand All @@ -170,9 +171,9 @@ class EngineIOSessionActor[SessionData](
.recover { case e =>
ConnectionRefused(e)
}
.pipeTo(self)(sender)
.pipeTo(self)(sender())
} catch {
case NonFatal(e) => self.tell(ConnectionRefused(e), sender)
case NonFatal(e) => self.tell(ConnectionRefused(e), sender())
}
context.become(connecting)
}
Expand All @@ -183,7 +184,7 @@ class EngineIOSessionActor[SessionData](

case Connected(flow, requestId) =>
log.debug("{} - Connection successful", sid)
sender ! Packets(
sender() ! Packets(
sid,
activeTransport,
Seq(
Expand All @@ -203,11 +204,11 @@ class EngineIOSessionActor[SessionData](
doConnect(flow)

case ConnectionRefused(e) =>
sender ! Status.Failure(e)
sender() ! Status.Failure(e)

case Close(_, transport, requestId) =>
debug(requestId, transport, "Requested to close session")
sender ! Done
sender() ! Done
context.stop(self)
}

Expand Down Expand Up @@ -237,8 +238,8 @@ class EngineIOSessionActor[SessionData](
.expand(_.iterator)
.via(flow)
.batch(4, messagesToPackets)(_ ++ messagesToPackets(_))
.toMat(Sink.queue[Seq[EngineIOPacket]])(Keep.both)
.run
.toMat(Sink.queue[Seq[EngineIOPacket]]())(Keep.both)
.run()

sourceQueue = sourceQ
sinkQueue = sinkQ
Expand All @@ -264,26 +265,26 @@ class EngineIOSessionActor[SessionData](
existingRequestId
)
existing ! Close(sid, transport, requestId)
storeRequester(sender, requestId, transport)
storeRequester(sender(), requestId, transport)

case None if transport != activeTransport =>
packetsToSendByNonActiveTransport.get(transport) match {
case Some(packets) =>
debug(requestId, transport, "Sending message to non active transport")
sender ! Packets(sid, transport, packets, requestId)
sender() ! Packets(sid, transport, packets, requestId)
case None =>
debug(requestId, transport, "Retrieve from non active transport, waiting")
storeRequester(sender, requestId, transport)
storeRequester(sender(), requestId, transport)
}

case None if packetsToSend.isEmpty =>
debug(requestId, transport, "Retrieve while no packets, waiting")
storeRequester(sender, requestId, transport)
storeRequester(sender(), requestId, transport)

case None =>
// We have packets to send
debug(requestId, transport, "Retrieve with {} packets to send", packetsToSend.length)
sender ! Packets(sid, transport, packetsToSend, requestId)
sender() ! Packets(sid, transport, packetsToSend, requestId)
packetsToSend = Nil
requestMorePackets()
}
Expand Down Expand Up @@ -378,14 +379,14 @@ class EngineIOSessionActor[SessionData](
if (messagesToPush.nonEmpty) {
if (currentlyPushingMessages) {
messagesReceived ++= messagesToPush
messagesReceivedSenders += sender
messagesReceivedSenders += sender()
} else {
sourceQueue.offer(messagesToPush).map(_ => MessagesPushed).pipeTo(self)
currentlyPushingMessages = true
sender ! Done
sender() ! Done
}
} else {
sender ! Done
sender() ! Done
}
}

Expand Down Expand Up @@ -456,7 +457,7 @@ class EngineIOSessionActor[SessionData](
debug(requestId, transport, "Telling poller to go away to work around engine.io upgrade race condition")
requester ! Packets(sid, Polling, Seq(Utf8EngineIOPacket(EngineIOPacketType.Noop, "")), requestId)
} else {
retrieveRequesters += (transport -> RetrieveRequester(sender, requestId))
retrieveRequesters += (transport -> RetrieveRequester(sender(), requestId))
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/play/socketio/SocketIOSessionFlow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ private class SocketIOSessionStage[SessionData](
namespace,
connectToNamespaceCallback.applyOrElse(
(session, ns),
{ _: (SocketIOSession[SessionData], String) => throw NamespaceNotFound(namespace) }
{ (_: (SocketIOSession[SessionData], String)) => throw NamespaceNotFound(namespace) }
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ object SocketIOPacket {
}

TextEngineIOMessage(message.toString) ::
extraPackets.map(BinaryEngineIOMessage)
extraPackets.map(play.engineio.BinaryEngineIOMessage.apply)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object SocketIOEventCodec {
override def applyOrElse[A1 <: SocketIOEvent, B1 >: T](event: A1, default: (A1) => B1) = {
decoders
.andThen(decoder => decoder.apply(event))
.applyOrElse(event.name, { _: String => default(event) })
.applyOrElse(event.name, { (_: String) => default(event) })
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class SocketIOEventCodecSpec extends AnyWordSpec with Matchers with OptionValues
val encoder = encodeByType[Any] {
case _: String => "string" -> encodeJson[String]
case _: Int =>
"int" -> encodeJson[String].compose { i: Int => i.toString }
"int" -> encodeJson[String].compose { (i: Int) => i.toString }
}

val e1 = encoder("arg")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class TestSocketIOScalaApplication(initialSettings: Map[String, AnyRef]) extends

val (testDisconnectQueue, testDisconnectFlow) = {
val (sourceQueue, source) =
Source.queue[SocketIOEvent](10, OverflowStrategy.backpressure).toMat(BroadcastHub.sink)(Keep.both).run
Source.queue[SocketIOEvent](10, OverflowStrategy.backpressure).toMat(BroadcastHub.sink)(Keep.both).run()
(sourceQueue, Flow.fromSinkAndSource(Sink.ignore, source))
}

Expand Down