-
Notifications
You must be signed in to change notification settings - Fork 3.6k
[PREVIEW] Traversal oriented layout #22264
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PREVIEW] Traversal oriented layout #22264
Conversation
Test FAILed. |
The best conflicts are those where you have already deleted the file in conflict ;) |
I don't think we had a specific ticket for this so I added #22387 |
Refs #22387 |
Test FAILed. |
Test FAILed. |
Test FAILed. |
53192b2
to
ba63c7a
Compare
Test FAILed. |
@@ -3,14 +3,8 @@ | |||
*/ | |||
package akka.stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Investigate what can be salvaged from this Spec. see #22434
)) | ||
} | ||
|
||
//TODO: Dummy test cases just for smoke-testing. Should be removed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove test cases below, they were used as quick smoke tests. No longer necessary.
|
||
Flow.fromSinkAndSource(Sink.fromSubscriber(subscriber), Source.fromPublisher(publisher)) | ||
}) | ||
// TODO: Reenable these tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ticket for this: #22435
|
||
override def named(name: String): Graph[S, Mat] = addAttributes(Attributes.name(name)) | ||
} | ||
} | ||
|
||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was not used anywhere actually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was introduced for GearPump in drewhk@1d692da
Perhaps obsolete now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure, (I don't know what this was used for), but it will not work with the new layout this way. This can only be fixed I think if I understand what is the use case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's leave it out, everything is different anyway
* attribute can be used to declare subgraph boundaries across which the graph | ||
* shall not be fused. | ||
*/ | ||
object Fusing { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This had to be removed because the FusedGraph
and StructuralInfoModule
types no longer exist and emulating them seems to be very hard. See: #22431
There is no point in pre-fusing anymore, it won't be faster.
final case class GraphStageModule( | ||
shape: Shape, | ||
// TODO: Fix variance issues | ||
final case class GraphStageModule[+S <: Shape @uncheckedVariance, +M]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
someone can maybe tell me why this does not work anymore. Or is this the right way to fix it.
if (attributes ne this.attributes) new GraphStageModule(shape, attributes, stage) | ||
else this | ||
|
||
override private[stream] def traversalBuilder = LinearTraversalBuilder.fromModule(this) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see #22427
* | ||
* This source is not reusable, it is only created internally. | ||
*/ | ||
final class MaterializedValueSource[T](val computation: MaterializedValueNode, val out: Outlet[T]) extends GraphStage[SourceShape[T]] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gone for good!
interpreter.attachDownstreamBoundary(connections(length), downstream) | ||
interpreter.init(null) | ||
|
||
// TODO: Fix this (assembly is gone) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see #22433
@@ -643,29 +643,6 @@ final class SubSink[T](name: String, externalCallback: ActorSubscriberMessage | |||
override def toString: String = name | |||
} | |||
|
|||
object SubSource { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was not used anywhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was in Akka HTTP actually
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not needed anymore, I replaced it there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mostly small comments, feel free to create tickets if something is not quick fix
|
||
override def named(name: String): Graph[S, Mat] = addAttributes(Attributes.name(name)) | ||
} | ||
} | ||
|
||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was introduced for GearPump in drewhk@1d692da
Perhaps obsolete now?
} | ||
|
||
private case class SegmentInfo( | ||
globalislandOffset: Int, // The island to which the segment belongs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
camel case on island?
globalIslandOffset
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, true
} | ||
|
||
private case class ForwardWire( | ||
islandGlobalOffset: Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
globalislandOffset vs islandGlobalOffset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, these names should be consistent. Will fix.
import scala.concurrent.{ ExecutionContextExecutor, Future } | ||
import scala.util.Random | ||
|
||
object PhasedFusingActorMaterializer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Banff you showed a nice sketch of what islands and segments are. Could you describe that here somewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I will add internal documentation after things are fixed. I create a ticket.
|
||
} | ||
|
||
case class PhasedFusingActorMaterializer( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason this is a case class
?
final
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it should be final. Probably does not need to be a case class either.
* in a fixed location, therefore the last step of the Traversal might need to be changed in those cases from the | ||
* -1 relative offset to something else (see rewireLastOutTo). | ||
*/ | ||
final case class LinearTraversalBuilder( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make everything in impl
INTERNAL API, unless intended otherwise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep
val builderKey = new BuilderKey | ||
|
||
val newBuildSteps = | ||
if (combineMat == Keep.left) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We convert those earlier in the javadsl so we don't have to care about javadsl.
Perhaps worth a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
newInOffsets = newInOffsets.updated(in, inSlots + submodule.offsetOf(in.mappedTo)) | ||
} | ||
|
||
val outIterator = shape.outlets.iterator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if there are cases where inlets or outlets can be empty it might be worth checking nonEmpty
before creating the iterator, note that even Nil.iterator
creates a new instance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good one, yes.
@@ -27,7 +28,10 @@ import scala.compat.java8.FutureConverters._ | |||
* an “atomic” source, e.g. from a collection or a file. Materialization turns a Source into | |||
* a Reactive Streams `Publisher` (at least conceptually). | |||
*/ | |||
final class Source[+Out, +Mat](override val module: Module) | |||
final class Source[+Out, +Mat]( | |||
override val traversalBuilder: LinearTraversalBuilder, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what should be public? exposing impl
as public seems wrong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, strange why it was public before. Maybe Gearpump needed it? We can make it hidden now as a custom phase will no longer need to know about the internal representation. Thanks for pointing out.
this.mapMaterializedValue((_) ⇒ NotUsed).asInstanceOf[Source[T, Mat3]] | ||
else | ||
this.mapMaterializedValue(combine(_, NotUsed.asInstanceOf[Mat2])).asInstanceOf[Source[T, Mat3]] | ||
if (flow.traversalBuilder eq Flow.identityTraversalBuilder) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happened with the Keep optimizations here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are implemented in the traversal now.
This PR is only intended for reviews of the initial commit. Development is going on in |
No description provided.