Skip to content

Conversation

drewhk
Copy link
Contributor

@drewhk drewhk commented Feb 6, 2017

No description provided.

@akka-ci akka-ci added validating PR is currently being validated by Jenkins needs-attention Indicates a PR validation failure (set by CI infrastructure) and removed validating PR is currently being validated by Jenkins labels Feb 6, 2017
@akka-ci
Copy link

akka-ci commented Feb 6, 2017

Test FAILed.

@drewhk
Copy link
Contributor Author

drewhk commented Feb 6, 2017

The best conflicts are those where you have already deleted the file in conflict ;)

@patriknw
Copy link
Contributor

I don't think we had a specific ticket for this so I added #22387

@patriknw
Copy link
Contributor

Refs #22387

@akka-ci akka-ci added validating PR is currently being validated by Jenkins needs-attention Indicates a PR validation failure (set by CI infrastructure) and removed needs-attention Indicates a PR validation failure (set by CI infrastructure) validating PR is currently being validated by Jenkins labels Mar 1, 2017
@akka-ci
Copy link

akka-ci commented Mar 1, 2017

Test FAILed.

@akka-ci akka-ci added validating PR is currently being validated by Jenkins needs-attention Indicates a PR validation failure (set by CI infrastructure) and removed needs-attention Indicates a PR validation failure (set by CI infrastructure) labels Mar 1, 2017
@akka-ci
Copy link

akka-ci commented Mar 1, 2017

Test FAILed.

@akka-ci akka-ci added validating PR is currently being validated by Jenkins needs-attention Indicates a PR validation failure (set by CI infrastructure) and removed validating PR is currently being validated by Jenkins needs-attention Indicates a PR validation failure (set by CI infrastructure) labels Mar 1, 2017
@akka-ci
Copy link

akka-ci commented Mar 1, 2017

Test FAILed.

@drewhk drewhk force-pushed the wip-traversal-oriented-layout-drewhk branch from 53192b2 to ba63c7a Compare March 1, 2017 15:27
@akka-ci akka-ci added validating PR is currently being validated by Jenkins needs-attention Indicates a PR validation failure (set by CI infrastructure) and removed needs-attention Indicates a PR validation failure (set by CI infrastructure) labels Mar 1, 2017
@akka-ci
Copy link

akka-ci commented Mar 1, 2017

Test FAILed.

@akka-ci akka-ci removed the validating PR is currently being validated by Jenkins label Mar 1, 2017
@@ -3,14 +3,8 @@
*/
package akka.stream
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Investigate what can be salvaged from this Spec. see #22434

))
}

//TODO: Dummy test cases just for smoke-testing. Should be removed.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove test cases below, they were used as quick smoke tests. No longer necessary.


Flow.fromSinkAndSource(Sink.fromSubscriber(subscriber), Source.fromPublisher(publisher))
})
// TODO: Reenable these tests
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ticket for this: #22435


override def named(name: String): Graph[S, Mat] = addAttributes(Attributes.name(name))
}
}

/**
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was not used anywhere actually.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was introduced for GearPump in drewhk@1d692da
Perhaps obsolete now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure, (I don't know what this was used for), but it will not work with the new layout this way. This can only be fixed I think if I understand what is the use case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's leave it out, everything is different anyway

* attribute can be used to declare subgraph boundaries across which the graph
* shall not be fused.
*/
object Fusing {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This had to be removed because the FusedGraph and StructuralInfoModule types no longer exist and emulating them seems to be very hard. See: #22431

There is no point in pre-fusing anymore, it won't be faster.

final case class GraphStageModule(
shape: Shape,
// TODO: Fix variance issues
final case class GraphStageModule[+S <: Shape @uncheckedVariance, +M](
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

someone can maybe tell me why this does not work anymore. Or is this the right way to fix it.

if (attributes ne this.attributes) new GraphStageModule(shape, attributes, stage)
else this

override private[stream] def traversalBuilder = LinearTraversalBuilder.fromModule(this)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see #22427

*
* This source is not reusable, it is only created internally.
*/
final class MaterializedValueSource[T](val computation: MaterializedValueNode, val out: Outlet[T]) extends GraphStage[SourceShape[T]] {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gone for good!

interpreter.attachDownstreamBoundary(connections(length), downstream)
interpreter.init(null)

// TODO: Fix this (assembly is gone)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see #22433

@@ -643,29 +643,6 @@ final class SubSink[T](name: String, externalCallback: ActorSubscriberMessage
override def toString: String = name
}

object SubSource {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was not used anywhere

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was in Akka HTTP actually

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed anymore, I replaced it there.

Copy link
Contributor

@patriknw patriknw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly small comments, feel free to create tickets if something is not quick fix


override def named(name: String): Graph[S, Mat] = addAttributes(Attributes.name(name))
}
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was introduced for GearPump in drewhk@1d692da
Perhaps obsolete now?

}

private case class SegmentInfo(
globalislandOffset: Int, // The island to which the segment belongs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

camel case on island?
globalIslandOffset

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, true

}

private case class ForwardWire(
islandGlobalOffset: Int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

globalislandOffset vs islandGlobalOffset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, these names should be consistent. Will fix.

import scala.concurrent.{ ExecutionContextExecutor, Future }
import scala.util.Random

object PhasedFusingActorMaterializer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Banff you showed a nice sketch of what islands and segments are. Could you describe that here somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will add internal documentation after things are fixed. I create a ticket.


}

case class PhasedFusingActorMaterializer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason this is a case class?
final ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it should be final. Probably does not need to be a case class either.

* in a fixed location, therefore the last step of the Traversal might need to be changed in those cases from the
* -1 relative offset to something else (see rewireLastOutTo).
*/
final case class LinearTraversalBuilder(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make everything in impl INTERNAL API, unless intended otherwise

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

val builderKey = new BuilderKey

val newBuildSteps =
if (combineMat == Keep.left) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We convert those earlier in the javadsl so we don't have to care about javadsl.
Perhaps worth a comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

newInOffsets = newInOffsets.updated(in, inSlots + submodule.offsetOf(in.mappedTo))
}

val outIterator = shape.outlets.iterator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there are cases where inlets or outlets can be empty it might be worth checking nonEmpty before creating the iterator, note that even Nil.iterator creates a new instance

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good one, yes.

@@ -27,7 +28,10 @@ import scala.compat.java8.FutureConverters._
* an “atomic” source, e.g. from a collection or a file. Materialization turns a Source into
* a Reactive Streams `Publisher` (at least conceptually).
*/
final class Source[+Out, +Mat](override val module: Module)
final class Source[+Out, +Mat](
override val traversalBuilder: LinearTraversalBuilder,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what should be public? exposing impl as public seems wrong

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, strange why it was public before. Maybe Gearpump needed it? We can make it hidden now as a custom phase will no longer need to know about the internal representation. Thanks for pointing out.

this.mapMaterializedValue((_) ⇒ NotUsed).asInstanceOf[Source[T, Mat3]]
else
this.mapMaterializedValue(combine(_, NotUsed.asInstanceOf[Mat2])).asInstanceOf[Source[T, Mat3]]
if (flow.traversalBuilder eq Flow.identityTraversalBuilder) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happened with the Keep optimizations here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are implemented in the traversal now.

@patriknw
Copy link
Contributor

patriknw commented Mar 6, 2017

This PR is only intended for reviews of the initial commit. Development is going on in wip-traversal-oriented-layout-drewhk and new PRs will be opened to that branch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs-attention Indicates a PR validation failure (set by CI infrastructure)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants