Monday, March 23, 2020

Get Date OF Week From Datetime

The following function reture weekday in differe format from a day.  
 
def getWeekDayChar(self,date_time : datetime,rt_type : str ) -> str :

    ''' 
     :param date_time: datetime     
     :param rt_type: 'i' = number, 'l' = long form, 's' = short form     
     :return: week date in spefified format 
    ''' 
  
    week_date_dict : dict = {0: ('Monday','Mon'), 1:('Tuesday','Tus'),2 : ('Wendsday', 'Wed'),
                             3:('Thursday','Thu'), 4: ('Friday','Fri') ,
                             5 :('Saturday','Sat')   ,6:('Sunday','Sun')}

    week_date_num : int = date_time.weekday()

    if rt_type == 'i' :
        return week_date_num
    elif rt_type == 'l' :
        return week_date_dict(week_date_num)(0)
    elif rt_type == 's' :
        return week_date_dict(week_date_num)(1)
    else :
        return None

Sunday, March 22, 2020

Retrieve JWT Authentication Access Token Using HttpsURLConnection


In JWT Authentication, a token is used to access the API interfaces.  In general, it will be a two-step authentication process. First, it will authenticate the user using the user's credentials such as login id, password, client id, and secret keys. Following is a method on how to get the access token uses HTTPS protocol.

    public JwtAuthenticationResponse getHttpsAuthToken(String hostName, int port,     
                                                                    String   userName, String userPassword,
                                                            String grantType, String clientId, String clientSecret,
                                                               String urlPath) 
        HttpsURLConnection httpConn = null;
        final int BUFFER_SIZE = 4096;
        // Create a trust manager that does not validate certificate chains (should be removed in prod env)
        TrustManager[] trustAllCerts = new TrustManager[]{new X509TrustManager() {
            public java.security.cert.X509Certificate[] getAcceptedIssuers() {
                return null;
            }
            public void checkClientTrusted(X509Certificate[] certs, String authType) {
            }
            public void checkServerTrusted(X509Certificate[] certs, String authType) {
            }
        }
        };
        try {
            // Install the all-trusting trust manager
            SSLContext sc = SSLContext.getInstance("SSL");
            sc.init(null, trustAllCerts, new java.security.SecureRandom());
            HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());

            // Create all-trusting host name verifier
            HostnameVerifier allHostsValid = new HostnameVerifier() {
                public boolean verify(String hostname, SSLSession session) {
                    return true;
                }
            };

            HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid);

        } catch (NoSuchAlgorithmException e) {

            logger.debug(String.format("Cannot connect to Hybris server:  %s ;", e.toString()));
            throw new RuntimeException(e);

        } catch (KeyManagementException e) {
            logger.debug(String.format("Cannot connect to Hybris server:  %s ;", e.toString()));
            throw new RuntimeException(e);

        }

        String url;
url = "https://"+hostName + ":"+String.valueOf(port)+"/services/oauth2/token";
        logger.debug("get authendication using: " + url);

        InputStream inputStream;

        try {

            StringBuilder data = new StringBuilder ( );

            data.append ("grant_type=" + grantType);
            data.append ("&client_id=" + clientId);
            data.append ("&client_secret=" + clientSecret);

            if (userId != null && !userId.equals ("undefined")) {
                data.append ("&username=" + userId);
            }
            if (userPassword != null && !userPassword.equals ("undefined")) {
                data.append ("&password=" + userPassword);
            }

           String auth = clientId + ":" + clientSecret;
            byte[] encodedS = Base64.encodeBase64 (auth.getBytes ( ));

            String authHeader = "Basic " + new String (encodedS);

            // Create a byte array of the data to be sent
            byte[] encodedAuth = data.toString ( ).getBytes ("UTF-8");
            String authId = userId + ":" + userPassword;
            byte[] encodedId = Base64.encodeBase64 (authId.getBytes ( ));
            String authHeaderId = "Basic " + new String (encodedId);

            // Setup the Request
            URL request = new URL (url);
            httpConn = (HttpsURLConnection) request.openConnection ( );
            httpConn.setRequestMethod ("POST");
            httpConn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
            httpConn.setRequestProperty("Content-Length", "" + encodedAuth.length);
            httpConn.setRequestProperty("Content-Length", "" + encodedAuth.length);
            httpConn.setRequestProperty("Authorization", authHeaderId);
            httpConn.setUseCaches (false);
            httpConn.setDoOutput (true);

            // Write data
            OutputStream postStream = httpConn.getOutputStream ( );
            postStream.write (encodedAuth);
            postStream.flush ( );
            postStream.close ( );
            // For POST only - END
            int responseCode = httpConn.getResponseCode ( );
            String response;
            if (responseCode == 200) {
                inputStream = httpConn.getInputStream ( );
                ObjectMapper mapper = new ObjectMapper ( );
                JsonNode jsonMap = mapper.readTree (inputStream);
                String token = jsonMap.get ("access_token").toString ( ).replaceAll ("\"", "");
                String tokenType = jsonMap.get ("token_type").toString ( ).replaceAll ("\"", "");
                String refreshToken = null;
                return new JwtAuthenticationResponse (jsonMap.get ("access_token").toString ( ));
            } else {
                inputStream = httpConn.getErrorStream ( );
                try (Scanner scanner = new Scanner (inputStream)) {
                    scanner.useDelimiter ("\\Z");
                    response = (scanner.next ( ));
             }

            String exceptionMsg = "Response Code " + String.valueOf (responseCode) + ". " +
                                                  response;
                throw new UserDefinedException (exceptionMsg);
            }
        } catch (IOException | UserDefinedException e) {

            logger.debug(String.format("Cannot connect to server:  %s ;", e.toString()));
            throw new RuntimeException(e);
        }
    }

Saturday, March 21, 2020

Enable Logging In Python

Python’s logging module is part of the Python Standard Library. The logging module’s basicConfig() method 
is the quickest way to configure the desired behavior of your logger. However,  applications use file-based or 
dictionary-based logging configuration requires a slightly different configuration.

One simple way to set up file-based logging is to use a config file and fileConfig module to read the log configuration properties. 

The log config file will have a format like the one below:

[loggers]
keys=root,pipeline

[handlers]
keys=fileHandler

[formatters]
keys=simpleFormatter

[logger_root]
level=DEBUG
handlers=fileHandler

[logger_pipeline]
level=DEBUG
handlers=fileHandler
qualname=pipeline
propagate=0

[handler_fileHandler]
class=FileHandler
level=DEBUG
formatter=simpleFormatter
args=('./log/pipeline.log','a')

[formatter_simpleFormatter]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
datefmt=

To make it a bit simple to enable logging inside the application, a utility module can be created to handle the logging.

import logging
from logging.config import fileConfig
class AppLogger():
   #ensure a singleton. We only want one log per applcation.
   __instance = None

   #create a static methold to get a logger instance.
   # The staticmethod() built-in function returns a static method for a given function.
   @staticmethod
   def get_applog():
       if AppLogger.__instance is None:
           AppLogger()

       return AppLogger.__instance

   def __init__(self):
       if AppLogger.__instance is not None:
           raise Exception("Only one application log should be activated at a time")
       AppLogger.__instance = AppLogger.create_applogger()

   @staticmethod
   def create_applogger():
       fileConfig('./config/applog_config.cfg', disable_existing_loggers=False)
       logger = logging.getLogger('pipeline')
       return logger

Now, in any of the modules within the application, we can involve the logger by calling the log module.

from util.log import AppLogger

Logger = AppLogger.get_applog()

class TestClass() :

    def  a_fuction():
       try:
           # codes here
       
       except Exception as e:
           Logger.error(“Error occurred {0}“.format(str(e))

         finally:
           #do something

The Python documention provides good reference on how to implement logging: