core,netty: use PerfMark tags with the HTTP/2 stream ids

This change removes the WriteQueue linking and splits it out into each
of the commands, so that the trace is more precise, and the tag
information is correct.

It is still unclear what the initial Tag should be for ClientCallImpl,
since it should not access the TransportState to get the HTTP/2 stream id.
diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java
index 886fa0f..f279d11 100644
--- a/core/src/main/java/io/grpc/internal/ServerImpl.java
+++ b/core/src/main/java/io/grpc/internal/ServerImpl.java
@@ -466,7 +466,7 @@
 
     @Override
     public void streamCreated(ServerStream stream, String methodName, Metadata headers) {
-      Tag tag = PerfMark.createTag(methodName, stream.hashCode());
+      Tag tag = PerfMark.createTag(methodName, stream.streamId());
       PerfMark.startTask("ServerTransportListener.streamCreated", tag);
       try {
         streamCreatedInternal(stream, methodName, headers, tag);
diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java
index ea6c43a..dd6aa46 100644
--- a/core/src/test/java/io/grpc/internal/ServerImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java
@@ -483,10 +483,12 @@
 
     transportListener.streamCreated(stream, "Waiter/nonexist", requestHeaders);
 
+    verify(stream).streamId();
     verify(stream).close(statusCaptor.capture(), any(Metadata.class));
     Status status = statusCaptor.getValue();
     assertEquals(Status.Code.UNIMPLEMENTED, status.getCode());
     assertEquals("Can't find decompressor for " + decompressorName, status.getDescription());
+
     verifyNoMoreInteractions(stream);
   }
 
@@ -786,6 +788,7 @@
     when(stream.statsTraceContext()).thenReturn(statsTraceCtx);
 
     transportListener.streamCreated(stream, "Waiter/serve", requestHeaders);
+    verify(stream).streamId();
     verify(stream).setListener(streamListenerCaptor.capture());
     ServerStreamListener streamListener = streamListenerCaptor.getValue();
     assertNotNull(streamListener);
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
index 5cf74d8..6d5ea8c 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
@@ -74,6 +74,8 @@
 import io.netty.handler.codec.http2.StreamBufferingEncoder;
 import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor;
 import io.netty.handler.logging.LogLevel;
+import io.perfmark.PerfMark;
+import io.perfmark.Tag;
 import java.nio.channels.ClosedChannelException;
 import java.util.concurrent.Executor;
 import java.util.logging.Level;
@@ -355,6 +357,7 @@
     // Stream 1 is reserved for the Upgrade response, so we should ignore its headers here:
     if (streamId != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
       NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
+      PerfMark.event("NettyClientHandler.onHeadersRead", stream.tag());
       stream.transportHeadersReceived(headers, endStream);
     }
 
@@ -369,6 +372,7 @@
   private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream) {
     flowControlPing().onDataRead(data.readableBytes(), padding);
     NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
+    PerfMark.event("NettyClientHandler.onDataRead", stream.tag());
     stream.transportDataReceived(data, endOfStream);
     if (keepAliveManager != null) {
       keepAliveManager.onDataReceived();
@@ -381,6 +385,7 @@
   private void onRstStreamRead(int streamId, long errorCode) {
     NettyClientStream.TransportState stream = clientStream(connection().stream(streamId));
     if (stream != null) {
+      PerfMark.event("NettyClientHandler.onRstStreamRead", stream.tag());
       Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode)
           .augmentDescription("Received Rst Stream");
       stream.transportReportStatus(
@@ -508,7 +513,7 @@
    * Attempts to create a new stream from the given command. If there are too many active streams,
    * the creation request is queued.
    */
-  private void createStream(final CreateStreamCommand command, final ChannelPromise promise)
+  private void createStream(CreateStreamCommand command, ChannelPromise promise)
           throws Exception {
     if (lifecycleManager.getShutdownThrowable() != null) {
       command.stream().setNonExistent();
@@ -521,7 +526,7 @@
     }
 
     // Get the stream ID for the new stream.
-    final int streamId;
+    int streamId;
     try {
       streamId = incrementAndGetNextStreamId();
     } catch (StatusException e) {
@@ -539,54 +544,71 @@
       return;
     }
 
-    final NettyClientStream.TransportState stream = command.stream();
-    final Http2Headers headers = command.headers();
+    NettyClientStream.TransportState stream = command.stream();
+    Http2Headers headers = command.headers();
     stream.setId(streamId);
 
+    PerfMark.startTask("NettyClientHandler.createStream", stream.tag());
+    command.getLink().link();
+    try {
+      createStreamTraced(
+          streamId, stream, headers, command.isGet(), command.shouldBeCountedForInUse(), promise);
+    } finally {
+      PerfMark.stopTask("NettyClientHandler.createStream", stream.tag());
+    }
+  }
+
+  private void createStreamTraced(
+      final int streamId,
+      final NettyClientStream.TransportState stream,
+      final Http2Headers headers,
+      boolean isGet,
+      final boolean shouldBeCountedForInUse,
+      final ChannelPromise promise) {
     // Create an intermediate promise so that we can intercept the failure reported back to the
     // application.
     ChannelPromise tempPromise = ctx().newPromise();
-    encoder().writeHeaders(ctx(), streamId, headers, 0, command.isGet(), tempPromise)
-            .addListener(new ChannelFutureListener() {
-              @Override
-              public void operationComplete(ChannelFuture future) throws Exception {
-                if (future.isSuccess()) {
-                  // The http2Stream will be null in case a stream buffered in the encoder
-                  // was canceled via RST_STREAM.
-                  Http2Stream http2Stream = connection().stream(streamId);
-                  if (http2Stream != null) {
-                    stream.getStatsTraceContext().clientOutboundHeaders();
-                    http2Stream.setProperty(streamKey, stream);
+    encoder().writeHeaders(ctx(), streamId, headers, 0, isGet, tempPromise)
+        .addListener(new ChannelFutureListener() {
+          @Override
+          public void operationComplete(ChannelFuture future) throws Exception {
+            if (future.isSuccess()) {
+              // The http2Stream will be null in case a stream buffered in the encoder
+              // was canceled via RST_STREAM.
+              Http2Stream http2Stream = connection().stream(streamId);
+              if (http2Stream != null) {
+                stream.getStatsTraceContext().clientOutboundHeaders();
+                http2Stream.setProperty(streamKey, stream);
 
-                    // This delays the in-use state until the I/O completes, which technically may
-                    // be later than we would like.
-                    if (command.shouldBeCountedForInUse()) {
-                      inUseState.updateObjectInUse(http2Stream, true);
-                    }
-
-                    // Attach the client stream to the HTTP/2 stream object as user data.
-                    stream.setHttp2Stream(http2Stream);
-                  }
-                  // Otherwise, the stream has been cancelled and Netty is sending a
-                  // RST_STREAM frame which causes it to purge pending writes from the
-                  // flow-controller and delete the http2Stream. The stream listener has already
-                  // been notified of cancellation so there is nothing to do.
-
-                  // Just forward on the success status to the original promise.
-                  promise.setSuccess();
-                } else {
-                  final Throwable cause = future.cause();
-                  if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) {
-                    StreamBufferingEncoder.Http2GoAwayException e =
-                        (StreamBufferingEncoder.Http2GoAwayException) cause;
-                    lifecycleManager.notifyShutdown(statusFromGoAway(e.errorCode(), e.debugData()));
-                    promise.setFailure(lifecycleManager.getShutdownThrowable());
-                  } else {
-                    promise.setFailure(cause);
-                  }
+                // This delays the in-use state until the I/O completes, which technically may
+                // be later than we would like.
+                if (shouldBeCountedForInUse) {
+                  inUseState.updateObjectInUse(http2Stream, true);
                 }
+
+                // Attach the client stream to the HTTP/2 stream object as user data.
+                stream.setHttp2Stream(http2Stream);
               }
-            });
+              // Otherwise, the stream has been cancelled and Netty is sending a
+              // RST_STREAM frame which causes it to purge pending writes from the
+              // flow-controller and delete the http2Stream. The stream listener has already
+              // been notified of cancellation so there is nothing to do.
+
+              // Just forward on the success status to the original promise.
+              promise.setSuccess();
+            } else {
+              final Throwable cause = future.cause();
+              if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) {
+                StreamBufferingEncoder.Http2GoAwayException e =
+                    (StreamBufferingEncoder.Http2GoAwayException) cause;
+                lifecycleManager.notifyShutdown(statusFromGoAway(e.errorCode(), e.debugData()));
+                promise.setFailure(lifecycleManager.getShutdownThrowable());
+              } else {
+                promise.setFailure(cause);
+              }
+            }
+          }
+        });
   }
 
   /**
@@ -595,14 +617,20 @@
   private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd,
       ChannelPromise promise) {
     NettyClientStream.TransportState stream = cmd.stream();
-    Status reason = cmd.reason();
-    if (reason != null) {
-      stream.transportReportStatus(reason, true, new Metadata());
-    }
-    if (!cmd.stream().isNonExistent()) {
-      encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise);
-    } else {
-      promise.setSuccess();
+    PerfMark.startTask("NettyClientHandler.cancelStream", stream.tag());
+    cmd.getLink().link();
+    try {
+      Status reason = cmd.reason();
+      if (reason != null) {
+        stream.transportReportStatus(reason, true, new Metadata());
+      }
+      if (!cmd.stream().isNonExistent()) {
+        encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise);
+      } else {
+        promise.setSuccess();
+      }
+    } finally {
+      PerfMark.stopTask("NettyClientHandler.cancelStream", stream.tag());
     }
   }
 
@@ -611,16 +639,33 @@
    */
   private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
       ChannelPromise promise) {
-    // Call the base class to write the HTTP/2 DATA frame.
-    // Note: no need to flush since this is handled by the outbound flow controller.
-    encoder().writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise);
+    PerfMark.startTask("NettyClientHandler.sendGrpcFrame", cmd.stream().tag());
+    cmd.getLink().link();
+    try {
+      // Call the base class to write the HTTP/2 DATA frame.
+      // Note: no need to flush since this is handled by the outbound flow controller.
+      encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise);
+    } finally {
+      PerfMark.stopTask("NettyClientHandler.sendGrpcFrame", cmd.stream().tag());
+    }
+  }
+
+  private void sendPingFrame(ChannelHandlerContext ctx, SendPingCommand msg,
+      ChannelPromise promise) {
+    PerfMark.startTask("NettyClientHandler.sendPingFrame");
+    msg.getLink().link();
+    try {
+      sendPingFrameTraced(ctx, msg, promise);
+    } finally {
+      PerfMark.stopTask("NettyClientHandler.sendPingFrame");
+    }
   }
 
   /**
    * Sends a PING frame. If a ping operation is already outstanding, the callback in the message is
    * registered to be called when the existing operation completes, and no new frame is sent.
    */
-  private void sendPingFrame(ChannelHandlerContext ctx, SendPingCommand msg,
+  private void sendPingFrameTraced(ChannelHandlerContext ctx, SendPingCommand msg,
       ChannelPromise promise) {
     // Don't check lifecycleManager.getShutdownStatus() since we want to allow pings after shutdown
     // but before termination. After termination, messages will no longer arrive because the
@@ -690,12 +735,19 @@
       @Override
       public boolean visit(Http2Stream stream) throws Http2Exception {
         NettyClientStream.TransportState clientStream = clientStream(stream);
-        if (clientStream != null) {
-          clientStream.transportReportStatus(msg.getStatus(), true, new Metadata());
-          resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
+        Tag tag = clientStream != null ? clientStream.tag() : PerfMark.createTag();
+        PerfMark.startTask("NettyClientHandler.forcefulClose", tag);
+        msg.getLink().link();
+        try {
+          if (clientStream != null) {
+            clientStream.transportReportStatus(msg.getStatus(), true, new Metadata());
+            resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
+          }
+          stream.close();
+          return true;
+        } finally {
+          PerfMark.stopTask("NettyClientHandler.forcefulClose", tag);
         }
-        stream.close();
-        return true;
       }
     });
     promise.setSuccess();
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientStream.java b/netty/src/main/java/io/grpc/netty/NettyClientStream.java
index 0949faa..14100c5 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientStream.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientStream.java
@@ -43,7 +43,9 @@
 import io.netty.handler.codec.http2.Http2Headers;
 import io.netty.handler.codec.http2.Http2Stream;
 import io.netty.util.AsciiString;
+import io.perfmark.Link;
 import io.perfmark.PerfMark;
+import io.perfmark.Tag;
 import javax.annotation.Nullable;
 
 /**
@@ -215,9 +217,20 @@
         transportState().requestMessagesFromDeframer(numMessages);
       } else {
         channel.eventLoop().execute(new Runnable() {
+          final Link link = PerfMark.link();
           @Override
           public void run() {
-            transportState().requestMessagesFromDeframer(numMessages);
+            PerfMark.startTask(
+                "NettyClientStream$Sink.requestMessagesFromDeframer",
+                transportState().tag());
+            link.link();
+            try {
+              transportState().requestMessagesFromDeframer(numMessages);
+            } finally {
+              PerfMark.stopTask(
+                  "NettyClientStream$Sink.requestMessagesFromDeframer",
+                  transportState().tag());
+            }
           }
         });
       }
@@ -249,20 +262,25 @@
       implements StreamIdHolder {
     private static final int NON_EXISTENT_ID = -1;
 
+    private final String methodName;
     private final NettyClientHandler handler;
     private final EventLoop eventLoop;
     private int id;
     private Http2Stream http2Stream;
+    private Tag tag;
 
     public TransportState(
         NettyClientHandler handler,
         EventLoop eventLoop,
         int maxMessageSize,
         StatsTraceContext statsTraceCtx,
-        TransportTracer transportTracer) {
+        TransportTracer transportTracer,
+        String methodName) {
       super(maxMessageSize, statsTraceCtx, transportTracer);
+      this.methodName = checkNotNull(methodName, "methodName");
       this.handler = checkNotNull(handler, "handler");
       this.eventLoop = checkNotNull(eventLoop, "eventLoop");
+      tag = PerfMark.createTag(methodName);
     }
 
     @Override
@@ -275,6 +293,7 @@
       checkArgument(id > 0, "id must be positive %s", id);
       checkState(this.id == 0, "id has been previously set: %s", this.id);
       this.id = id;
+      this.tag = PerfMark.createTag(methodName, id);
     }
 
     /**
@@ -359,5 +378,10 @@
     void transportDataReceived(ByteBuf frame, boolean endOfStream) {
       transportDataReceived(new NettyReadableBuffer(frame.retain()), endOfStream);
     }
+
+    @Override
+    public final Tag tag() {
+      return tag;
+    }
   }
 }
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
index de01f7d..2577422 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
@@ -176,7 +176,8 @@
             channel.eventLoop(),
             maxMessageSize,
             statsTraceCtx,
-            transportTracer) {
+            transportTracer,
+            method.getFullMethodName()) {
           @Override
           protected Status statusFromFailedFuture(ChannelFuture f) {
             return NettyClientTransport.this.statusFromFailedFuture(f);
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
index 41a630e..026f4e3 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
@@ -83,6 +83,8 @@
 import io.netty.handler.logging.LogLevel;
 import io.netty.util.AsciiString;
 import io.netty.util.ReferenceCountUtil;
+import io.perfmark.PerfMark;
+import io.perfmark.Tag;
 import java.util.List;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledFuture;
@@ -427,18 +429,25 @@
           http2Stream,
           maxMessageSize,
           statsTraceCtx,
-          transportTracer);
-      String authority = getOrUpdateAuthority((AsciiString) headers.authority());
-      NettyServerStream stream = new NettyServerStream(
-          ctx.channel(),
-          state,
-          attributes,
-          authority,
-          statsTraceCtx,
-          transportTracer);
-      transportListener.streamCreated(stream, method, metadata);
-      state.onStreamAllocated();
-      http2Stream.setProperty(streamKey, state);
+          transportTracer,
+          method);
+
+      PerfMark.startTask("NettyServerHandler.onHeadersRead", state.tag());
+      try {
+        String authority = getOrUpdateAuthority((AsciiString) headers.authority());
+        NettyServerStream stream = new NettyServerStream(
+            ctx.channel(),
+            state,
+            attributes,
+            authority,
+            statsTraceCtx,
+            transportTracer);
+        transportListener.streamCreated(stream, method, metadata);
+        state.onStreamAllocated();
+        http2Stream.setProperty(streamKey, state);
+      } finally {
+        PerfMark.stopTask("NettyServerHandler.onHeadersRead", state.tag());
+      }
     } catch (Exception e) {
       logger.log(Level.WARNING, "Exception in onHeadersRead()", e);
       // Throw an exception that will get handled by onStreamError.
@@ -463,7 +472,12 @@
     flowControlPing().onDataRead(data.readableBytes(), padding);
     try {
       NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
-      stream.inboundDataReceived(data, endOfStream);
+      PerfMark.startTask("NettyServerHandler.onDataRead", stream.tag());
+      try {
+        stream.inboundDataReceived(data, endOfStream);
+      } finally {
+        PerfMark.stopTask("NettyServerHandler.onDataRead", stream.tag());
+      }
     } catch (Throwable e) {
       logger.log(Level.WARNING, "Exception in onDataRead()", e);
       // Throw an exception that will get handled by onStreamError.
@@ -475,8 +489,13 @@
     try {
       NettyServerStream.TransportState stream = serverStream(connection().stream(streamId));
       if (stream != null) {
-        stream.transportReportStatus(
-            Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode));
+        PerfMark.startTask("NettyServerHandler.onRstStreamRead", stream.tag());
+        try {
+          stream.transportReportStatus(
+              Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode));
+        } finally {
+          PerfMark.stopTask("NettyServerHandler.onRstStreamRead", stream.tag());
+        }
       }
     } catch (Throwable e) {
       logger.log(Level.WARNING, "Exception in onRstStreamRead()", e);
@@ -499,12 +518,18 @@
     logger.log(Level.WARNING, "Stream Error", cause);
     NettyServerStream.TransportState serverStream = serverStream(
         connection().stream(Http2Exception.streamId(http2Ex)));
-    if (serverStream != null) {
-      serverStream.transportReportStatus(Utils.statusFromThrowable(cause));
+    Tag tag = serverStream != null ? serverStream.tag() : PerfMark.createTag();
+    PerfMark.startTask("NettyServerHandler.onStreamError", tag);
+    try {
+      if (serverStream != null) {
+        serverStream.transportReportStatus(Utils.statusFromThrowable(cause));
+      }
+      // TODO(ejona): Abort the stream by sending headers to help the client with debugging.
+      // Delegate to the base class to send a RST_STREAM.
+      super.onStreamError(ctx, outbound, cause, http2Ex);
+    } finally {
+      PerfMark.stopTask("NettyServerHandler.onStreamError", tag);
     }
-    // TODO(ejona): Abort the stream by sending headers to help the client with debugging.
-    // Delegate to the base class to send a RST_STREAM.
-    super.onStreamError(ctx, outbound, cause, http2Ex);
   }
 
   @Override
@@ -623,11 +648,17 @@
    */
   private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
       ChannelPromise promise) throws Http2Exception {
-    if (cmd.endStream()) {
-      closeStreamWhenDone(promise, cmd.streamId());
+    PerfMark.startTask("NettyServerHandler.sendGrpcFrame", cmd.stream().tag());
+    cmd.getLink().link();
+    try {
+      if (cmd.endStream()) {
+        closeStreamWhenDone(promise, cmd.stream().id());
+      }
+      // Call the base class to write the HTTP/2 DATA frame.
+      encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise);
+    } finally {
+      PerfMark.stopTask("NettyServerHandler.sendGrpcFrame", cmd.stream().tag());
     }
-    // Call the base class to write the HTTP/2 DATA frame.
-    encoder().writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise);
   }
 
   /**
@@ -635,26 +666,38 @@
    */
   private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd,
       ChannelPromise promise) throws Http2Exception {
-    // TODO(carl-mastrangelo): remove this check once https://github.com/netty/netty/issues/6296 is
-    // fixed.
-    int streamId = cmd.stream().id();
-    Http2Stream stream = connection().stream(streamId);
-    if (stream == null) {
-      resetStream(ctx, streamId, Http2Error.CANCEL.code(), promise);
-      return;
+    PerfMark.startTask("NettyServerHandler.sendResponseHeaders", cmd.stream().tag());
+    cmd.getLink().link();
+    try {
+      // TODO(carl-mastrangelo): remove this check once https://github.com/netty/netty/issues/6296
+      // is fixed.
+      int streamId = cmd.stream().id();
+      Http2Stream stream = connection().stream(streamId);
+      if (stream == null) {
+        resetStream(ctx, streamId, Http2Error.CANCEL.code(), promise);
+        return;
+      }
+      if (cmd.endOfStream()) {
+        closeStreamWhenDone(promise, streamId);
+      }
+      encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
+    } finally {
+      PerfMark.stopTask("NettyServerHandler.sendResponseHeaders", cmd.stream().tag());
     }
-    if (cmd.endOfStream()) {
-      closeStreamWhenDone(promise, streamId);
-    }
-    encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
   }
 
   private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd,
       ChannelPromise promise) {
-    // Notify the listener if we haven't already.
-    cmd.stream().transportReportStatus(cmd.reason());
-    // Terminate the stream.
-    encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
+    PerfMark.startTask("NettyServerHandler.cancelStream", cmd.stream().tag());
+    cmd.getLink().link();
+    try {
+      // Notify the listener if we haven't already.
+      cmd.stream().transportReportStatus(cmd.reason());
+      // Terminate the stream.
+      encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
+    } finally {
+      PerfMark.stopTask("NettyServerHandler.cancelStream", cmd.stream().tag());
+    }
   }
 
   private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
@@ -665,8 +708,14 @@
       public boolean visit(Http2Stream stream) throws Http2Exception {
         NettyServerStream.TransportState serverStream = serverStream(stream);
         if (serverStream != null) {
-          serverStream.transportReportStatus(msg.getStatus());
-          resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
+          PerfMark.startTask("NettyServerHandler.forcefulClose", serverStream.tag());
+          msg.getLink().link();
+          try {
+            serverStream.transportReportStatus(msg.getStatus());
+            resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
+          } finally {
+            PerfMark.stopTask("NettyServerHandler.forcefulClose", serverStream.tag());
+          }
         }
         stream.close();
         return true;
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerStream.java b/netty/src/main/java/io/grpc/netty/NettyServerStream.java
index 64ab6a3..5f96701 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerStream.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerStream.java
@@ -33,7 +33,9 @@
 import io.netty.channel.EventLoop;
 import io.netty.handler.codec.http2.Http2Headers;
 import io.netty.handler.codec.http2.Http2Stream;
+import io.perfmark.Link;
 import io.perfmark.PerfMark;
+import io.perfmark.Tag;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -98,10 +100,21 @@
         // Processing data read in the event loop so can call into the deframer immediately
         transportState().requestMessagesFromDeframer(numMessages);
       } else {
+        final Link link = PerfMark.link();
         channel.eventLoop().execute(new Runnable() {
           @Override
           public void run() {
-            transportState().requestMessagesFromDeframer(numMessages);
+            PerfMark.startTask(
+                "NettyServerStream$Sink.requestMessagesFromDeframer",
+                transportState().tag());
+            link.link();
+            try {
+              transportState().requestMessagesFromDeframer(numMessages);
+            } finally {
+              PerfMark.stopTask(
+                  "NettyServerStream$Sink.requestMessagesFromDeframer",
+                  transportState().tag());
+            }
           }
         });
       }
@@ -195,6 +208,7 @@
     private final Http2Stream http2Stream;
     private final NettyServerHandler handler;
     private final EventLoop eventLoop;
+    private final Tag tag;
 
     public TransportState(
         NettyServerHandler handler,
@@ -202,11 +216,13 @@
         Http2Stream http2Stream,
         int maxMessageSize,
         StatsTraceContext statsTraceCtx,
-        TransportTracer transportTracer) {
+        TransportTracer transportTracer,
+        String methodName) {
       super(maxMessageSize, statsTraceCtx, transportTracer);
       this.http2Stream = checkNotNull(http2Stream, "http2Stream");
       this.handler = checkNotNull(handler, "handler");
       this.eventLoop = eventLoop;
+      this.tag = PerfMark.createTag(methodName, http2Stream.id());
     }
 
     @Override
@@ -240,6 +256,11 @@
     public int id() {
       return http2Stream.id();
     }
+
+    @Override
+    public Tag tag() {
+      return tag;
+    }
   }
 
   @Override
diff --git a/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java b/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java
index 4a343e8..5080584 100644
--- a/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java
+++ b/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java
@@ -46,8 +46,8 @@
     return link;
   }
 
-  int streamId() {
-    return stream.id();
+  StreamIdHolder stream() {
+    return stream;
   }
 
   boolean endStream() {
@@ -100,7 +100,7 @@
 
   @Override
   public String toString() {
-    return getClass().getSimpleName() + "(streamId=" + streamId()
+    return getClass().getSimpleName() + "(streamId=" + stream.id()
         + ", endStream=" + endStream + ", content=" + content()
         + ")";
   }
diff --git a/netty/src/main/java/io/grpc/netty/StreamIdHolder.java b/netty/src/main/java/io/grpc/netty/StreamIdHolder.java
index 80e0449..65203c7 100644
--- a/netty/src/main/java/io/grpc/netty/StreamIdHolder.java
+++ b/netty/src/main/java/io/grpc/netty/StreamIdHolder.java
@@ -16,10 +16,14 @@
 
 package io.grpc.netty;
 
+import io.perfmark.Tag;
+
 /** Container for stream ids. */
 interface StreamIdHolder {
   /**
    * Returns the id.
    */
   int id();
+
+  Tag tag();
 }
diff --git a/netty/src/main/java/io/grpc/netty/WriteQueue.java b/netty/src/main/java/io/grpc/netty/WriteQueue.java
index f3ef7ce..89c0990 100644
--- a/netty/src/main/java/io/grpc/netty/WriteQueue.java
+++ b/netty/src/main/java/io/grpc/netty/WriteQueue.java
@@ -112,13 +112,7 @@
       int i = 0;
       boolean flushedOnce = false;
       while ((cmd = queue.poll()) != null) {
-        PerfMark.startTask("WriteQueue.run");
-        try {
-          cmd.getLink().link();
-          cmd.run(channel);
-        } finally {
-          PerfMark.stopTask("WriteQueue.run");
-        }
+        cmd.run(channel);
         if (++i == DEQUE_CHUNK_SIZE) {
           i = 0;
           // Flush each chunk so we are releasing buffers periodically. In theory this loop
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
index 6cdf1ca..e771a93 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
@@ -789,7 +789,13 @@
         EventLoop eventLoop,
         int maxMessageSize,
         TransportTracer transportTracer) {
-      super(handler, eventLoop, maxMessageSize, StatsTraceContext.NOOP, transportTracer);
+      super(
+          handler,
+          eventLoop,
+          maxMessageSize,
+          StatsTraceContext.NOOP,
+          transportTracer,
+          "methodName");
     }
 
     @Override
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java b/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java
index a28e385..13042f2 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java
@@ -550,7 +550,13 @@
 
   private class TransportStateImpl extends NettyClientStream.TransportState {
     public TransportStateImpl(NettyClientHandler handler, int maxMessageSize) {
-      super(handler, channel.eventLoop(), maxMessageSize, StatsTraceContext.NOOP, transportTracer);
+      super(
+          handler,
+          channel.eventLoop(),
+          maxMessageSize,
+          StatsTraceContext.NOOP,
+          transportTracer,
+          "methodName");
     }
 
     @Override
diff --git a/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java b/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java
index ecae24f..39ebf63 100644
--- a/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java
@@ -287,7 +287,7 @@
     TransportTracer transportTracer = new TransportTracer();
     NettyServerStream.TransportState state = new NettyServerStream.TransportState(
         handler, channel.eventLoop(), http2Stream, DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx,
-        transportTracer);
+        transportTracer, "method");
     NettyServerStream stream = new NettyServerStream(channel, state, Attributes.EMPTY,
         "test-authority", statsTraceCtx, transportTracer);
     stream.transportState().setListener(serverListener);