#!/usr/bin/python2.7
"""
Copyright (C) 2019 The Android Open Source Project

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

# Run OboeTester with progressive timing changes
# to measure the DSP timing profile.
# Print a CSV table of offsets and glitch counts.
#
# Run Automated Test using an Intent
# https://github.com/google/oboe/blob/master/apps/OboeTester/docs/AutomatedTesting.md

import array
import collections
import os
import os.path
import sys
import subprocess
import time

from datetime import datetime

kPropertyOutputOffset = "aaudio.out_mmap_offset_usec"
kMinPeakAmplitude = 0.04
kOffsetMin = 0000
kOffsetMax = 4000
kOffsetIncrement = 100
kOutputFileBase = "/sdcard/dsp_timing_"
gOutputFile = kOutputFileBase + "now.txt"

def launchLatencyTest():
    command = ["adb", "shell", "am", \
               "start", "-n", "com.google.sample.oboe.manualtest/.MainActivity", \
               "--es", "test", "latency", \
               "--es", "file", gOutputFile, \
               "--ei", "buffer_bursts", "1"]
    return subprocess.check_output(command)

def launchGlitchTest():
    command = ["adb", "shell", "am", \
               "start", "-n", "com.google.sample.oboe.manualtest/.MainActivity", \
               "--es", "test", "glitch", \
               "--es", "file", gOutputFile, \
               "--es", "in_perf", "lowlat", \
               "--es", "out_perf", "lowlat", \
               "--es", "in_sharing", "exclusive", \
               "--es", "out_sharing", "exclusive", \
               "--ei", "buffer_bursts", "1"]
    return subprocess.check_output(command)

def setAndroidProperty(property, value):
    return subprocess.check_output(["adb", "shell", "setprop", property, value])

def getAndroidProperty(property):
    return subprocess.check_output(["adb", "shell", "getprop", property])

def setOutputOffset(offset):
    setAndroidProperty(kPropertyOutputOffset, str(offset))

def getOutputOffset():
    offsetText = getAndroidProperty(kPropertyOutputOffset).strip()
    if len(offsetText) == 0:
        return 0;
    return int(offsetText)

def loadNameValuePairsFromFile(filename):
    myvars = {}
    with open(filename) as myfile:
        for line in myfile:
            name, var = line.partition("=")[::2]
            myvars[name.strip()] = var
    return myvars

def loadNameValuePairsFromString(text):
    myvars = {}
    listOutput = text.splitlines()
    for line in listOutput:
            name, var = line.partition("=")[::2]
            myvars[name.strip()] = var
    return myvars

def waitForTestResult():
    testOutput = ""
    for i in range(10):
        if subprocess.call(["adb", "shell", "ls", gOutputFile, "2>/dev/null"]) == 0:
            testOutput = subprocess.check_output(["adb", "shell", "cat", gOutputFile])
            break
        else:
            print str(i) + ": waiting until test finishes..."
            time.sleep(2)
    # print testOutput
    subprocess.call(["adb", "shell", "rm", gOutputFile])
    return loadNameValuePairsFromString(testOutput)

# volume too low?
# return true if bad
def checkPeakAmplitude(pairs):
    if not pairs.has_key('peak.amplitude'):
        print "ERROR no peak.amplitude"
        return True
    peakAmplitude = float(pairs['peak.amplitude'])
    if peakAmplitude < kMinPeakAmplitude:
        print "ERROR peakAmplitude = " + str(peakAmplitude) \
            + " < " + str(kMinPeakAmplitude) \
            + ", turn up volume"
        return True
    return False

def startOneTest(offset):
    print "=========================="
    setOutputOffset(offset)
    print "try offset = " + getAndroidProperty(kPropertyOutputOffset)
    subprocess.call(["adb", "shell", "rm", gOutputFile, "2>/dev/null"])

def runOneGlitchTest(offset):
    startOneTest(offset)
    print launchGlitchTest()
    pairs = waitForTestResult()
    if checkPeakAmplitude(pairs):
        return -1
    if not pairs.has_key('glitch.count'):
        print "ERROR no glitch.count"
        return -1
    return int(pairs['glitch.count'])

def runOneLatencyTest(offset):
    startOneTest(offset)
    print launchLatencyTest()
    pairs = waitForTestResult()
    if not pairs.has_key('latency.msec'):
        print "ERROR no latency.msec"
        return -1
    return float(pairs['latency.msec'])

def runGlitchSeries():
    offsets = array.array('i')
    glitches = array.array('i')
    for offset in range(kOffsetMin, kOffsetMax, kOffsetIncrement):
        offsets.append(offset)
        result = runOneGlitchTest(offset)
        glitches.append(result)
        if result < 0:
            break
        print "offset = " + str(offset) + ", glitches = " + str(result)
    print "offsetUs, glitches"
    for i in range(len(offsets)):
        print "  " + str(offsets[i]) + ", " + str(glitches[i])

# return true if bad
def checkLatencyValid():
    startOneTest(0)
    print launchLatencyTest()
    pairs = waitForTestResult()
    print "burst = " + pairs['out.burst.frames']
    capacity = int(pairs['out.buffer.capacity.frames'])
    if capacity < 0:
        print "ERROR capacity = " + str(capacity)
        return True
    sampleRate = int(pairs['out.rate'])
    capacityMillis = capacity * 1000.0 / sampleRate
    print "capacityMillis = " + str(capacityMillis)
    if pairs['in.mmap'].strip() != "yes":
        print "ERROR Not using input MMAP"
        return True
    if pairs['out.mmap'].strip() != "yes":
        print "ERROR Not using output MMAP"
        return True
    # Check whether we can change latency a moving the DSP pointer
    # past the CPU pointer and wrapping the buffer.
    latencyMin = runOneLatencyTest(kOffsetMin)
    latencyMax = runOneLatencyTest(kOffsetMax + 1000)
    print "latency = " + str(latencyMin) + " => " + str(latencyMax)
    if latencyMax < (latencyMin + (capacityMillis / 2)):
        print "ERROR Latency not affected by changing offset!"
        return True
    return False

def isRunningAsRoot():
    userName = subprocess.check_output(["adb", "shell", "whoami"]).strip()
    if userName != "root":
        print "WARNING: changing to 'adb root'"
        subprocess.call(["adb", "root"])
        userName = subprocess.check_output(["adb", "shell", "whoami"]).strip()
        if userName != "root":
            print "ERROR: cannot set 'adb root'"
            return False
    return True

def isMMapSupported():
    mmapPolicy = int(getAndroidProperty("aaudio.mmap_policy").strip())
    if mmapPolicy < 2:
        print "ERROR: AAudio MMAP not enabled, aaudio.mmap_policy = " + str(mmapPolicy)
        return False
    if checkLatencyValid():
        return False;
    return True

def isTimingSeriesSupported():
    if not isRunningAsRoot():
        return False
    if not isMMapSupported():
        return False
    return True

def main():
    global gOutputFile
    print "gOutputFile = " + gOutputFile
    now = datetime.now() # current date and time
    gOutputFile = kOutputFileBase \
            + now.strftime("%Y%m%d_%H%M%S") \
            + ".txt"
    print "gOutputFile = " + gOutputFile

    initialOffset = getOutputOffset()
    print "initial offset = " + str(initialOffset)

    print "Android version = " + \
            getAndroidProperty("ro.build.id").strip()
    print "    release " + \
            getAndroidProperty("ro.build.version.release").strip()
    if (isTimingSeriesSupported()):
        runGlitchSeries()
        setOutputOffset(initialOffset)

if __name__ == '__main__':
    main()

